You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/02 18:17:11 UTC

[3/4] beam git commit: [BEAM-2670] Fixes SparkRuntimeContext.getPipelineOptions()

[BEAM-2670] Fixes SparkRuntimeContext.getPipelineOptions()

It used a global variable to store the deserialized options,
so even if there were several instances of SparkRuntimeContext
created with different PipelineOptions, they would all
return the same value depending on which one was asked first.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff4b36c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff4b36c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff4b36c8

Branch: refs/heads/master
Commit: ff4b36c8ae1bd5e436ad63a32997273c8b4a97fe
Parents: 0a358c7
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jul 27 13:05:23 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 2 11:04:50 2017 -0700

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    |  2 +-
 .../spark/translation/SparkRuntimeContext.java  | 48 ++++++++------------
 .../translation/SparkRuntimeContextTest.java    |  2 +-
 3 files changed, 22 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 0c6c4d1..23e430a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -65,7 +65,7 @@ public class EvaluationContext {
     this.jsc = jsc;
     this.pipeline = pipeline;
     this.options = options;
-    this.runtime = new SparkRuntimeContext(pipeline, options);
+    this.runtime = new SparkRuntimeContext(options);
   }
 
   public EvaluationContext(

http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index f3fe99c..6361bb2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -21,11 +21,12 @@ package org.apache.beam.runners.spark.translation;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
@@ -34,11 +35,16 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
  * data flow program is launched.
  */
 public class SparkRuntimeContext implements Serializable {
-  private final String serializedPipelineOptions;
+  private final Supplier<PipelineOptions> optionsSupplier;
   private transient CoderRegistry coderRegistry;
 
-  SparkRuntimeContext(Pipeline pipeline, PipelineOptions options) {
-    this.serializedPipelineOptions = serializePipelineOptions(options);
+  SparkRuntimeContext(PipelineOptions options) {
+    String serializedPipelineOptions = serializePipelineOptions(options);
+    this.optionsSupplier =
+        Suppliers.memoize(
+            Suppliers.compose(
+                new DeserializeOptions(),
+                Suppliers.ofInstance(serializedPipelineOptions)));
   }
 
   /**
@@ -59,16 +65,8 @@ public class SparkRuntimeContext implements Serializable {
     }
   }
 
-  private static PipelineOptions deserializePipelineOptions(String serializedPipelineOptions) {
-    try {
-      return createMapper().readValue(serializedPipelineOptions, PipelineOptions.class);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to deserialize the pipeline options.", e);
-    }
-  }
-
   public PipelineOptions getPipelineOptions() {
-    return PipelineOptionsHolder.getOrInit(serializedPipelineOptions);
+    return optionsSupplier.get();
   }
 
   public CoderRegistry getCoderRegistry() {
@@ -78,21 +76,15 @@ public class SparkRuntimeContext implements Serializable {
     return coderRegistry;
   }
 
-  private static class PipelineOptionsHolder {
-    // on executors, this should deserialize once.
-    private static transient volatile PipelineOptions pipelineOptions = null;
-
-    static PipelineOptions getOrInit(String serializedPipelineOptions) {
-      if (pipelineOptions == null) {
-        synchronized (PipelineOptionsHolder.class) {
-          if (pipelineOptions == null) {
-            pipelineOptions = deserializePipelineOptions(serializedPipelineOptions);
-          }
-        }
-        // Register standard FileSystems.
-        FileSystems.setDefaultPipelineOptions(pipelineOptions);
+  private static class DeserializeOptions
+      implements Function<String, PipelineOptions>, Serializable {
+    @Override
+    public PipelineOptions apply(String options) {
+      try {
+        return createMapper().readValue(options, PipelineOptions.class);
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to deserialize the pipeline options.", e);
       }
-      return pipelineOptions;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
index e8f578a..456056a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
@@ -105,7 +105,7 @@ public class SparkRuntimeContextTest {
         .as(JacksonIncompatibleOptions.class);
     options.setRunner(CrashingRunner.class);
     Pipeline p = Pipeline.create(options);
-    SparkRuntimeContext context = new SparkRuntimeContext(p, options);
+    SparkRuntimeContext context = new SparkRuntimeContext(options);
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {