You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/08/29 12:05:21 UTC
[1/2] incubator-beam git commit: [BEAM-313] Provide a context for
SparkRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master 676843e04 -> 3666c22cb
[BEAM-313] Provide a context for SparkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/017da7ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/017da7ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/017da7ba
Branch: refs/heads/master
Commit: 017da7bac3e844ef7391aabbcbaf86c9c99af968
Parents: 676843e
Author: Abbass MAROUNI <am...@talend.com>
Authored: Mon Aug 29 13:28:46 2016 +0200
Committer: Abbass MAROUNI <am...@talend.com>
Committed: Mon Aug 29 13:28:46 2016 +0200
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 13 ++
.../apache/beam/runners/spark/SparkRunner.java | 16 ++-
.../runners/spark/ProvidedSparkContextTest.java | 138 +++++++++++++++++++
3 files changed, 164 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index be4f7f0..db6b75c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -18,11 +18,14 @@
package org.apache.beam.runners.spark;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.spark.api.java.JavaSparkContext;
/**
* Spark runner pipeline options.
@@ -49,4 +52,14 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
@Default.Boolean(true)
Boolean getEnableSparkSinks();
void setEnableSparkSinks(Boolean enableSparkSinks);
+
+ @Description("If the spark runner will be initialized with a provided Spark Context")
+ @Default.Boolean(false)
+ boolean getUsesProvidedSparkContext();
+ void setUsesProvidedSparkContext(boolean value);
+
+ @Description("Provided Java Spark Context")
+ @JsonIgnore
+ JavaSparkContext getProvidedSparkContext();
+ void setProvidedSparkContext(JavaSparkContext jsc);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index fa85a2e..9f1a839 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -143,9 +143,19 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
public EvaluationResult run(Pipeline pipeline) {
try {
LOG.info("Executing pipeline using the SparkRunner.");
- JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(),
- mOptions.getAppName());
-
+ JavaSparkContext jsc;
+ if (mOptions.getUsesProvidedSparkContext()) {
+ LOG.info("Using a provided Spark Context");
+ jsc = mOptions.getProvidedSparkContext();
+ if (jsc == null || jsc.sc().isStopped()){
+ LOG.error("The provided Spark context "
+ + jsc + " was not created or was stopped");
+ throw new RuntimeException("The provided Spark context was not created or was stopped");
+ }
+ } else {
+ LOG.info("Creating a new Spark Context");
+ jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), mOptions.getAppName());
+ }
if (mOptions.isStreaming()) {
SparkPipelineTranslator translator =
new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
new file mode 100644
index 0000000..cbc5976
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Test;
+
+/**
+ * Provided Spark Context tests.
+ */
+public class ProvidedSparkContextTest {
+ private static final String[] WORDS_ARRAY = {
+ "hi there", "hi", "hi sue bob",
+ "hi sue", "", "bob hi"};
+ private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+ private static final Set<String> EXPECTED_COUNT_SET =
+ ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+ private static final String PROVIDED_CONTEXT_EXCEPTION =
+ "The provided Spark context was not created or was stopped";
+
+ /**
+ * Provide a context and call pipeline run.
+ * @throws Exception
+ */
+ @Test
+ public void testWithProvidedContext() throws Exception {
+ JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
+
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ options.setUsesProvidedSparkContext(true);
+ options.setProvidedSparkContext(jsc);
+
+ Pipeline p = Pipeline.create(options);
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+ .of()));
+ PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+ PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+ // Run test from pipeline
+ p.run();
+
+ jsc.stop();
+ }
+
+ /**
+ * Provide a context and call pipeline run.
+ * @throws Exception
+ */
+ @Test
+ public void testWithNullContext() throws Exception {
+ JavaSparkContext jsc = null;
+
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ options.setUsesProvidedSparkContext(true);
+ options.setProvidedSparkContext(jsc);
+
+ Pipeline p = Pipeline.create(options);
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+ .of()));
+ PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+ PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+ try {
+ p.run();
+ fail("Should throw an exception when The provided Spark context is null");
+ } catch (RuntimeException e){
+ assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
+ }
+ }
+
+ /**
+ * A SparkRunner with a stopped provided Spark context cannot run pipelines.
+ * @throws Exception
+ */
+ @Test
+ public void testWithStoppedProvidedContext() throws Exception {
+ JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
+ // Stop the provided Spark context directly
+ jsc.stop();
+
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ options.setUsesProvidedSparkContext(true);
+ options.setProvidedSparkContext(jsc);
+
+ Pipeline p = Pipeline.create(options);
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+ .of()));
+ PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+ PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+ try {
+ p.run();
+ fail("Should throw an exception when The provided Spark context is stopped");
+ } catch (RuntimeException e){
+ assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
+ }
+ }
+
+}
[2/2] incubator-beam git commit: [BEAM-313] This closes #401
Posted by jb...@apache.org.
[BEAM-313] This closes #401
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3666c22c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3666c22c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3666c22c
Branch: refs/heads/master
Commit: 3666c22cbf06009d97ab39707318aae56c9da907
Parents: 676843e 017da7b
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Aug 29 13:56:54 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Aug 29 13:56:54 2016 +0200
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 13 ++
.../apache/beam/runners/spark/SparkRunner.java | 16 ++-
.../runners/spark/ProvidedSparkContextTest.java | 138 +++++++++++++++++++
3 files changed, 164 insertions(+), 3 deletions(-)
----------------------------------------------------------------------