You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/03/07 06:38:06 UTC

[1/2] beam git commit: [BEAM-1556] Make PipelineOptions a lazy-singleton and init IOs as part of it.

Repository: beam
Updated Branches:
  refs/heads/master 410534b1f -> 1fd52f53c


[BEAM-1556] Make PipelineOptions a lazy-singleton and init IOs as part of it.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4dda585c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4dda585c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4dda585c

Branch: refs/heads/master
Commit: 4dda585cda61a775e2d616fa5c25698f490b9cd3
Parents: 410534b
Author: Sela <an...@paypal.com>
Authored: Mon Mar 6 11:17:00 2017 +0200
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 6 22:37:41 2017 -0800

----------------------------------------------------------------------
 .../spark/translation/SparkRuntimeContext.java  | 29 ++++++++++++++++----
 1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4dda585c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 9c3d79f..4ccfead 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.spark.Accumulator;
 
 /**
@@ -40,12 +41,10 @@ import org.apache.spark.Accumulator;
  */
 public class SparkRuntimeContext implements Serializable {
   private final String serializedPipelineOptions;
+  private transient CoderRegistry coderRegistry;
 
-  /**
-   * Map fo names to Beam aggregators.
-   */
+  // map for names to Beam aggregators.
   private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
-  private transient CoderRegistry coderRegistry;
 
   SparkRuntimeContext(Pipeline pipeline) {
     this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
@@ -67,8 +66,8 @@ public class SparkRuntimeContext implements Serializable {
     }
   }
 
-  public synchronized PipelineOptions getPipelineOptions() {
-    return deserializePipelineOptions(serializedPipelineOptions);
+  public PipelineOptions getPipelineOptions() {
+    return PipelineOptionsHolder.getOrInit(serializedPipelineOptions);
   }
 
   /**
@@ -118,6 +117,24 @@ public class SparkRuntimeContext implements Serializable {
     return coderRegistry;
   }
 
+  private static class PipelineOptionsHolder {
+    // on executors, this should deserialize once.
+    private static transient volatile PipelineOptions pipelineOptions = null;
+
+    static PipelineOptions getOrInit(String serializedPipelineOptions) {
+      if (pipelineOptions == null) {
+        synchronized (PipelineOptionsHolder.class) {
+          if (pipelineOptions == null) {
+            pipelineOptions = deserializePipelineOptions(serializedPipelineOptions);
+          }
+        }
+        // register IO factories.
+        IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+      }
+      return pipelineOptions;
+    }
+  }
+
   /**
    * Initialize spark aggregators exactly once.
    *


[2/2] beam git commit: This closes #2169

Posted by da...@apache.org.
This closes #2169


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fd52f53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fd52f53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fd52f53

Branch: refs/heads/master
Commit: 1fd52f53c92ece265ed41bd7f2a06b0cdf6f8afd
Parents: 410534b 4dda585
Author: Davor Bonaci <da...@google.com>
Authored: Mon Mar 6 22:37:55 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 6 22:37:55 2017 -0800

----------------------------------------------------------------------
 .../spark/translation/SparkRuntimeContext.java  | 29 ++++++++++++++++----
 1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------