You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/03/07 06:38:06 UTC
[1/2] beam git commit: [BEAM-1556] Make PipelineOptions a
lazy-singleton and init IOs as part of it.
Repository: beam
Updated Branches:
refs/heads/master 410534b1f -> 1fd52f53c
[BEAM-1556] Make PipelineOptions a lazy-singleton and init IOs as part of it.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4dda585c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4dda585c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4dda585c
Branch: refs/heads/master
Commit: 4dda585cda61a775e2d616fa5c25698f490b9cd3
Parents: 410534b
Author: Sela <an...@paypal.com>
Authored: Mon Mar 6 11:17:00 2017 +0200
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 6 22:37:41 2017 -0800
----------------------------------------------------------------------
.../spark/translation/SparkRuntimeContext.java | 29 ++++++++++++++++----
1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4dda585c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 9c3d79f..4ccfead 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.spark.Accumulator;
/**
@@ -40,12 +41,10 @@ import org.apache.spark.Accumulator;
*/
public class SparkRuntimeContext implements Serializable {
private final String serializedPipelineOptions;
+ private transient CoderRegistry coderRegistry;
- /**
- * Map fo names to Beam aggregators.
- */
+ // map for names to Beam aggregators.
private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
- private transient CoderRegistry coderRegistry;
SparkRuntimeContext(Pipeline pipeline) {
this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
@@ -67,8 +66,8 @@ public class SparkRuntimeContext implements Serializable {
}
}
- public synchronized PipelineOptions getPipelineOptions() {
- return deserializePipelineOptions(serializedPipelineOptions);
+ public PipelineOptions getPipelineOptions() {
+ return PipelineOptionsHolder.getOrInit(serializedPipelineOptions);
}
/**
@@ -118,6 +117,24 @@ public class SparkRuntimeContext implements Serializable {
return coderRegistry;
}
+ private static class PipelineOptionsHolder {
+ // on executors, this should deserialize once.
+ private static transient volatile PipelineOptions pipelineOptions = null;
+
+ static PipelineOptions getOrInit(String serializedPipelineOptions) {
+ if (pipelineOptions == null) {
+ synchronized (PipelineOptionsHolder.class) {
+ if (pipelineOptions == null) {
+ pipelineOptions = deserializePipelineOptions(serializedPipelineOptions);
+ }
+ }
+ // register IO factories.
+ IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+ }
+ return pipelineOptions;
+ }
+ }
+
/**
* Initialize spark aggregators exactly once.
*
[2/2] beam git commit: This closes #2169
Posted by da...@apache.org.
This closes #2169
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fd52f53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fd52f53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fd52f53
Branch: refs/heads/master
Commit: 1fd52f53c92ece265ed41bd7f2a06b0cdf6f8afd
Parents: 410534b 4dda585
Author: Davor Bonaci <da...@google.com>
Authored: Mon Mar 6 22:37:55 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 6 22:37:55 2017 -0800
----------------------------------------------------------------------
.../spark/translation/SparkRuntimeContext.java | 29 ++++++++++++++++----
1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------