You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/10 21:42:16 UTC
[1/2] incubator-beam git commit: [BEAM-944] Spark runner causes an
exception when creating pipeline options. Create a SparkContextOptions for
context-ware options.
Repository: incubator-beam
Updated Branches:
refs/heads/master cd3f61cf8 -> e43a38355
[BEAM-944] Spark runner causes an exception when creating pipeline options.
Create a SparkContextOptions for context-ware options.
Move UsesProvidedSparkContext property to SparkPipelineOptions so it's available from command-line
as well.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/121bff46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/121bff46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/121bff46
Branch: refs/heads/master
Commit: 121bff46d950e319eebf10e3a42bdd890edfb0c5
Parents: cd3f61c
Author: Sela <an...@paypal.com>
Authored: Tue Nov 8 23:05:13 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Thu Nov 10 23:27:17 2016 +0200
----------------------------------------------------------------------
.../beam/runners/spark/SparkContextOptions.java | 64 ++++++++++++++++++++
.../runners/spark/SparkPipelineOptions.java | 36 +++--------
.../spark/translation/SparkContextFactory.java | 19 +++---
.../SparkRunnerStreamingContextFactory.java | 3 +-
.../runners/spark/ProvidedSparkContextTest.java | 6 +-
.../streaming/KafkaStreamingTest.java | 4 +-
6 files changed, 91 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
new file mode 100644
index 0000000..98f7492
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
+
+
+/**
+ * A custom {@link PipelineOptions} to work with properties related to {@link JavaSparkContext}.
+ *
+ * <p>This can only be used programmatically (as opposed to passing command line arguments),
+ * since the properties here are context-aware and should not be propagated to workers.
+ *
+ * <p>Separating this from {@link SparkPipelineOptions} is needed so the context-aware properties,
+ * which link to Spark dependencies, won't be scanned by {@link PipelineOptions}
+ * reflective instantiation.
+ * Note that {@link SparkContextOptions} is not registered with {@link SparkRunnerRegistrar}.
+ */
+public interface SparkContextOptions extends SparkPipelineOptions {
+
+ @Description("Provided Java Spark Context")
+ @JsonIgnore
+ JavaSparkContext getProvidedSparkContext();
+ void setProvidedSparkContext(JavaSparkContext jsc);
+
+ @Description("Spark streaming listeners")
+ @Default.InstanceFactory(EmptyListenersList.class)
+ @JsonIgnore
+ List<JavaStreamingListener> getListeners();
+ void setListeners(List<JavaStreamingListener> listeners);
+
+ /** Returns an empty list, top avoid handling null. */
+ class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> {
+ @Override
+ public List<JavaStreamingListener> create(PipelineOptions options) {
+ return new ArrayList<>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 4eada35..5168c6c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -18,24 +18,22 @@
package org.apache.beam.runners.spark;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
/**
- * Spark runner pipeline options.
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations,
+ * such as the master address, batch-interval, and other user-related knobs.
*/
-public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
- ApplicationNameOptions {
+public interface SparkPipelineOptions
+ extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
+
@Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
@Default.String("local[4]")
String getSparkMaster();
@@ -93,27 +91,9 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
Boolean getEnableSparkSinks();
void setEnableSparkSinks(Boolean enableSparkSinks);
- @Description("If the spark runner will be initialized with a provided Spark Context")
+ @Description("If the spark runner will be initialized with a provided Spark Context. "
+ + "The Spark Context should be provided with SparkContextOptions.")
@Default.Boolean(false)
boolean getUsesProvidedSparkContext();
void setUsesProvidedSparkContext(boolean value);
-
- @Description("Provided Java Spark Context")
- @JsonIgnore
- JavaSparkContext getProvidedSparkContext();
- void setProvidedSparkContext(JavaSparkContext jsc);
-
- @Description("Spark streaming listeners")
- @Default.InstanceFactory(EmptyListenersList.class)
- @JsonIgnore
- List<JavaStreamingListener> getListeners();
- void setListeners(List<JavaStreamingListener> listeners);
-
- /** Returns an empty list, top avoid handling null. */
- class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> {
- @Override
- public List<JavaStreamingListener> create(PipelineOptions options) {
- return new ArrayList<>();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index ee2104a..c7f90b4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.spark.translation;
+import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator;
import org.apache.spark.SparkConf;
@@ -46,11 +47,13 @@ public final class SparkContextFactory {
}
public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
+ SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
// reuse should be ignored if the context is provided.
- if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !options.getUsesProvidedSparkContext()) {
+ if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)
+ && !contextOptions.getUsesProvidedSparkContext()) {
// if the context is null or stopped for some reason, re-create it.
if (sparkContext == null || sparkContext.sc().isStopped()) {
- sparkContext = createSparkContext(options);
+ sparkContext = createSparkContext(contextOptions);
sparkMaster = options.getSparkMaster();
} else if (!options.getSparkMaster().equals(sparkMaster)) {
throw new IllegalArgumentException(String.format("Cannot reuse spark context "
@@ -59,7 +62,7 @@ public final class SparkContextFactory {
}
return sparkContext;
} else {
- return createSparkContext(options);
+ return createSparkContext(contextOptions);
}
}
@@ -69,10 +72,10 @@ public final class SparkContextFactory {
}
}
- private static JavaSparkContext createSparkContext(SparkPipelineOptions options) {
- if (options.getUsesProvidedSparkContext()) {
+ private static JavaSparkContext createSparkContext(SparkContextOptions contextOptions) {
+ if (contextOptions.getUsesProvidedSparkContext()) {
LOG.info("Using a provided Spark Context");
- JavaSparkContext jsc = options.getProvidedSparkContext();
+ JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
if (jsc == null || jsc.sc().isStopped()){
LOG.error("The provided Spark context " + jsc + " was not created or was stopped");
throw new RuntimeException("The provided Spark context was not created or was stopped");
@@ -83,9 +86,9 @@ public final class SparkContextFactory {
SparkConf conf = new SparkConf();
if (!conf.contains("spark.master")) {
// set master if not set.
- conf.setMaster(options.getSparkMaster());
+ conf.setMaster(contextOptions.getSparkMaster());
}
- conf.setAppName(options.getAppName());
+ conf.setAppName(contextOptions.getAppName());
// register immutable collections serializers because the SDK uses them.
conf.set("spark.kryo.registrator", BeamSparkRunnerRegistrator.class.getName());
conf.set("spark.serializer", KryoSerializer.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index a670f61..f8ee8ad 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming;
import static com.google.common.base.Preconditions.checkArgument;
+import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -86,7 +87,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
jssc.checkpoint(checkpointDir);
// register listeners.
- for (JavaStreamingListener listener: options.getListeners()) {
+ for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners()) {
LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index cbc5976..c225073 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -56,7 +56,7 @@ public class ProvidedSparkContextTest {
public void testWithProvidedContext() throws Exception {
JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
@@ -83,7 +83,7 @@ public class ProvidedSparkContextTest {
public void testWithNullContext() throws Exception {
JavaSparkContext jsc = null;
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
@@ -114,7 +114,7 @@ public class ProvidedSparkContextTest {
// Stop the provided Spark context directly
jsc.stop();
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index f01059f..29e4609 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
@@ -121,7 +122,8 @@ public class KafkaStreamingTest {
@Test
public void testLatest() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
+ SparkContextOptions options =
+ commonOptions.withTmpCheckpointDir(checkpointParentDir).as(SparkContextOptions.class);
//--- setup
final String topic = "topic";
// messages.
[2/2] incubator-beam git commit: This closes #1316
Posted by am...@apache.org.
This closes #1316
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e43a3835
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e43a3835
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e43a3835
Branch: refs/heads/master
Commit: e43a383559cb825a498c7427d58ce0a56b3f5245
Parents: cd3f61c 121bff4
Author: Sela <an...@paypal.com>
Authored: Thu Nov 10 23:27:55 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Thu Nov 10 23:27:55 2016 +0200
----------------------------------------------------------------------
.../beam/runners/spark/SparkContextOptions.java | 64 ++++++++++++++++++++
.../runners/spark/SparkPipelineOptions.java | 36 +++--------
.../spark/translation/SparkContextFactory.java | 19 +++---
.../SparkRunnerStreamingContextFactory.java | 3 +-
.../runners/spark/ProvidedSparkContextTest.java | 6 +-
.../streaming/KafkaStreamingTest.java | 4 +-
6 files changed, 91 insertions(+), 41 deletions(-)
----------------------------------------------------------------------