You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/10 21:42:16 UTC

[1/2] incubator-beam git commit: [BEAM-944] Spark runner causes an exception when creating pipeline options. Create a SparkContextOptions for context-ware options.

Repository: incubator-beam
Updated Branches:
  refs/heads/master cd3f61cf8 -> e43a38355


[BEAM-944] Spark runner causes an exception when creating pipeline options.
Create a SparkContextOptions for context-ware options.

Move UsesProvidedSparkContext property to SparkPipelineOptions so it's available from command-line
as well.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/121bff46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/121bff46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/121bff46

Branch: refs/heads/master
Commit: 121bff46d950e319eebf10e3a42bdd890edfb0c5
Parents: cd3f61c
Author: Sela <an...@paypal.com>
Authored: Tue Nov 8 23:05:13 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Thu Nov 10 23:27:17 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/SparkContextOptions.java | 64 ++++++++++++++++++++
 .../runners/spark/SparkPipelineOptions.java     | 36 +++--------
 .../spark/translation/SparkContextFactory.java  | 19 +++---
 .../SparkRunnerStreamingContextFactory.java     |  3 +-
 .../runners/spark/ProvidedSparkContextTest.java |  6 +-
 .../streaming/KafkaStreamingTest.java           |  4 +-
 6 files changed, 91 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
new file mode 100644
index 0000000..98f7492
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
+
+
+/**
+ * A custom {@link PipelineOptions} to work with properties related to {@link JavaSparkContext}.
+ *
+ * <p>This can only be used programmatically (as opposed to passing command line arguments),
+ * since the properties here are context-aware and should not be propagated to workers.
+ *
+ * <p>Separating this from {@link SparkPipelineOptions} is needed so the context-aware properties,
+ * which link to Spark dependencies, won't be scanned by {@link PipelineOptions}
+ * reflective instantiation.
+ * Note that {@link SparkContextOptions} is not registered with {@link SparkRunnerRegistrar}.
+ */
+public interface SparkContextOptions extends SparkPipelineOptions {
+
+  @Description("Provided Java Spark Context")
+  @JsonIgnore
+  JavaSparkContext getProvidedSparkContext();
+  void setProvidedSparkContext(JavaSparkContext jsc);
+
+  @Description("Spark streaming listeners")
+  @Default.InstanceFactory(EmptyListenersList.class)
+  @JsonIgnore
+  List<JavaStreamingListener> getListeners();
+  void setListeners(List<JavaStreamingListener> listeners);
+
+  /** Returns an empty list, top avoid handling null. */
+  class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> {
+    @Override
+    public List<JavaStreamingListener> create(PipelineOptions options) {
+      return new ArrayList<>();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 4eada35..5168c6c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -18,24 +18,22 @@
 
 package org.apache.beam.runners.spark;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
 
 
 /**
- * Spark runner pipeline options.
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations,
+ * such as the master address, batch-interval, and other user-related knobs.
  */
-public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
-                                              ApplicationNameOptions {
+public interface SparkPipelineOptions
+    extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
+
   @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
   @Default.String("local[4]")
   String getSparkMaster();
@@ -93,27 +91,9 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
   Boolean getEnableSparkSinks();
   void setEnableSparkSinks(Boolean enableSparkSinks);
 
-  @Description("If the spark runner will be initialized with a provided Spark Context")
+  @Description("If the spark runner will be initialized with a provided Spark Context. "
+      + "The Spark Context should be provided with SparkContextOptions.")
   @Default.Boolean(false)
   boolean getUsesProvidedSparkContext();
   void setUsesProvidedSparkContext(boolean value);
-
-  @Description("Provided Java Spark Context")
-  @JsonIgnore
-  JavaSparkContext getProvidedSparkContext();
-  void setProvidedSparkContext(JavaSparkContext jsc);
-
-  @Description("Spark streaming listeners")
-  @Default.InstanceFactory(EmptyListenersList.class)
-  @JsonIgnore
-  List<JavaStreamingListener> getListeners();
-  void setListeners(List<JavaStreamingListener> listeners);
-
-  /** Returns an empty list, top avoid handling null. */
-  class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> {
-    @Override
-    public List<JavaStreamingListener> create(PipelineOptions options) {
-      return new ArrayList<>();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index ee2104a..c7f90b4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator;
 import org.apache.spark.SparkConf;
@@ -46,11 +47,13 @@ public final class SparkContextFactory {
   }
 
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
+    SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
     // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !options.getUsesProvidedSparkContext()) {
+    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)
+        && !contextOptions.getUsesProvidedSparkContext()) {
       // if the context is null or stopped for some reason, re-create it.
       if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(options);
+        sparkContext = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
       } else if (!options.getSparkMaster().equals(sparkMaster)) {
         throw new IllegalArgumentException(String.format("Cannot reuse spark context "
@@ -59,7 +62,7 @@ public final class SparkContextFactory {
       }
       return sparkContext;
     } else {
-      return createSparkContext(options);
+      return createSparkContext(contextOptions);
     }
   }
 
@@ -69,10 +72,10 @@ public final class SparkContextFactory {
     }
   }
 
-  private static JavaSparkContext createSparkContext(SparkPipelineOptions options) {
-    if (options.getUsesProvidedSparkContext()) {
+  private static JavaSparkContext createSparkContext(SparkContextOptions contextOptions) {
+    if (contextOptions.getUsesProvidedSparkContext()) {
       LOG.info("Using a provided Spark Context");
-      JavaSparkContext jsc = options.getProvidedSparkContext();
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
       if (jsc == null || jsc.sc().isStopped()){
         LOG.error("The provided Spark context " + jsc + " was not created or was stopped");
         throw new RuntimeException("The provided Spark context was not created or was stopped");
@@ -83,9 +86,9 @@ public final class SparkContextFactory {
       SparkConf conf = new SparkConf();
       if (!conf.contains("spark.master")) {
         // set master if not set.
-        conf.setMaster(options.getSparkMaster());
+        conf.setMaster(contextOptions.getSparkMaster());
       }
-      conf.setAppName(options.getAppName());
+      conf.setAppName(contextOptions.getAppName());
       // register immutable collections serializers because the SDK uses them.
       conf.set("spark.kryo.registrator", BeamSparkRunnerRegistrator.class.getName());
       conf.set("spark.serializer", KryoSerializer.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index a670f61..f8ee8ad 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -86,7 +87,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
     jssc.checkpoint(checkpointDir);
 
     // register listeners.
-    for (JavaStreamingListener listener: options.getListeners()) {
+    for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners()) {
       LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
       jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index cbc5976..c225073 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -56,7 +56,7 @@ public class ProvidedSparkContextTest {
     public void testWithProvidedContext() throws Exception {
         JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
 
-        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
         options.setRunner(SparkRunner.class);
         options.setUsesProvidedSparkContext(true);
         options.setProvidedSparkContext(jsc);
@@ -83,7 +83,7 @@ public class ProvidedSparkContextTest {
     public void testWithNullContext() throws Exception {
         JavaSparkContext jsc = null;
 
-        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
         options.setRunner(SparkRunner.class);
         options.setUsesProvidedSparkContext(true);
         options.setProvidedSparkContext(jsc);
@@ -114,7 +114,7 @@ public class ProvidedSparkContextTest {
         // Stop the provided Spark context directly
         jsc.stop();
 
-        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
         options.setRunner(SparkRunner.class);
         options.setUsesProvidedSparkContext(true);
         options.setProvidedSparkContext(jsc);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index f01059f..29e4609 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
@@ -121,7 +122,8 @@ public class KafkaStreamingTest {
 
   @Test
   public void testLatest() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    SparkContextOptions options =
+        commonOptions.withTmpCheckpointDir(checkpointParentDir).as(SparkContextOptions.class);
     //--- setup
     final String topic = "topic";
     // messages.


[2/2] incubator-beam git commit: This closes #1316

Posted by am...@apache.org.
This closes #1316


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e43a3835
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e43a3835
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e43a3835

Branch: refs/heads/master
Commit: e43a383559cb825a498c7427d58ce0a56b3f5245
Parents: cd3f61c 121bff4
Author: Sela <an...@paypal.com>
Authored: Thu Nov 10 23:27:55 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Thu Nov 10 23:27:55 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/SparkContextOptions.java | 64 ++++++++++++++++++++
 .../runners/spark/SparkPipelineOptions.java     | 36 +++--------
 .../spark/translation/SparkContextFactory.java  | 19 +++---
 .../SparkRunnerStreamingContextFactory.java     |  3 +-
 .../runners/spark/ProvidedSparkContextTest.java |  6 +-
 .../streaming/KafkaStreamingTest.java           |  4 +-
 6 files changed, 91 insertions(+), 41 deletions(-)
----------------------------------------------------------------------