You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/08/29 12:05:21 UTC

[1/2] incubator-beam git commit: [BEAM-313] Provide a context for SparkRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 676843e04 -> 3666c22cb


[BEAM-313] Provide a context for SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/017da7ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/017da7ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/017da7ba

Branch: refs/heads/master
Commit: 017da7bac3e844ef7391aabbcbaf86c9c99af968
Parents: 676843e
Author: Abbass MAROUNI <am...@talend.com>
Authored: Mon Aug 29 13:28:46 2016 +0200
Committer: Abbass MAROUNI <am...@talend.com>
Committed: Mon Aug 29 13:28:46 2016 +0200

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  13 ++
 .../apache/beam/runners/spark/SparkRunner.java  |  16 ++-
 .../runners/spark/ProvidedSparkContextTest.java | 138 +++++++++++++++++++
 3 files changed, 164 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index be4f7f0..db6b75c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -18,11 +18,14 @@
 
 package org.apache.beam.runners.spark;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.spark.api.java.JavaSparkContext;
 
 /**
  * Spark runner pipeline options.
@@ -49,4 +52,14 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
   @Default.Boolean(true)
   Boolean getEnableSparkSinks();
   void setEnableSparkSinks(Boolean enableSparkSinks);
+
+  @Description("If the spark runner will be initialized with a provided Spark Context")
+  @Default.Boolean(false)
+  boolean getUsesProvidedSparkContext();
+  void setUsesProvidedSparkContext(boolean value);
+
+  @Description("Provided Java Spark Context")
+  @JsonIgnore
+  JavaSparkContext getProvidedSparkContext();
+  void setProvidedSparkContext(JavaSparkContext jsc);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index fa85a2e..9f1a839 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -143,9 +143,19 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
   public EvaluationResult run(Pipeline pipeline) {
     try {
       LOG.info("Executing pipeline using the SparkRunner.");
-      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(),
-          mOptions.getAppName());
-
+      JavaSparkContext jsc;
+      if (mOptions.getUsesProvidedSparkContext()) {
+        LOG.info("Using a provided Spark Context");
+        jsc = mOptions.getProvidedSparkContext();
+        if (jsc == null || jsc.sc().isStopped()){
+          LOG.error("The provided Spark context "
+                  + jsc + " was not created or was stopped");
+          throw new RuntimeException("The provided Spark context was not created or was stopped");
+        }
+      } else {
+        LOG.info("Creating a new Spark Context");
+        jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), mOptions.getAppName());
+      }
       if (mOptions.isStreaming()) {
         SparkPipelineTranslator translator =
             new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
new file mode 100644
index 0000000..cbc5976
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Test;
+
+/**
+ * Provided Spark Context tests.
+ */
+public class ProvidedSparkContextTest {
+    private static final String[] WORDS_ARRAY = {
+            "hi there", "hi", "hi sue bob",
+            "hi sue", "", "bob hi"};
+    private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+    private static final Set<String> EXPECTED_COUNT_SET =
+            ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+    private static final String PROVIDED_CONTEXT_EXCEPTION =
+            "The provided Spark context was not created or was stopped";
+
+    /**
+     * Provide a context and call pipeline run.
+     * @throws Exception
+     */
+    @Test
+    public void testWithProvidedContext() throws Exception {
+        JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
+
+        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        options.setRunner(SparkRunner.class);
+        options.setUsesProvidedSparkContext(true);
+        options.setProvidedSparkContext(jsc);
+
+        Pipeline p = Pipeline.create(options);
+        PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+                .of()));
+        PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+                .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+        PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+        // Run test from pipeline
+        p.run();
+
+        jsc.stop();
+    }
+
+    /**
+     * Provide a context and call pipeline run.
+     * @throws Exception
+     */
+    @Test
+    public void testWithNullContext() throws Exception {
+        JavaSparkContext jsc = null;
+
+        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        options.setRunner(SparkRunner.class);
+        options.setUsesProvidedSparkContext(true);
+        options.setProvidedSparkContext(jsc);
+
+        Pipeline p = Pipeline.create(options);
+        PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+                .of()));
+        PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+                .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+        PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+        try {
+            p.run();
+            fail("Should throw an exception when The provided Spark context is null");
+        } catch (RuntimeException e){
+            assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
+        }
+    }
+
+    /**
+     * A SparkRunner with a stopped provided Spark context cannot run pipelines.
+     * @throws Exception
+     */
+    @Test
+    public void testWithStoppedProvidedContext() throws Exception {
+        JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
+        // Stop the provided Spark context directly
+        jsc.stop();
+
+        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        options.setRunner(SparkRunner.class);
+        options.setUsesProvidedSparkContext(true);
+        options.setProvidedSparkContext(jsc);
+
+        Pipeline p = Pipeline.create(options);
+        PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+                .of()));
+        PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+                .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+        PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+        try {
+            p.run();
+            fail("Should throw an exception when The provided Spark context is stopped");
+        } catch (RuntimeException e){
+            assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
+        }
+    }
+
+}


[2/2] incubator-beam git commit: [BEAM-313] This closes #401

Posted by jb...@apache.org.
[BEAM-313] This closes #401


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3666c22c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3666c22c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3666c22c

Branch: refs/heads/master
Commit: 3666c22cbf06009d97ab39707318aae56c9da907
Parents: 676843e 017da7b
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Aug 29 13:56:54 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Aug 29 13:56:54 2016 +0200

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  13 ++
 .../apache/beam/runners/spark/SparkRunner.java  |  16 ++-
 .../runners/spark/ProvidedSparkContextTest.java | 138 +++++++++++++++++++
 3 files changed, 164 insertions(+), 3 deletions(-)
----------------------------------------------------------------------