You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 05:32:23 UTC

incubator-beam git commit: SparkRunner calls pipeline.run

Repository: incubator-beam
Updated Branches:
  refs/heads/master c314e670e -> ac0875de8


SparkRunner calls pipeline.run

* Remove SparkStreamingPipelineOptions.
* Run pipeline with Pipeline.run().
* Better EmbeddedKafka.
* Avoid NPE if factory wasn't created.
* Let EmbeddedKafka/Zookeeper discover ports on their own.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ac0875de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ac0875de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ac0875de

Branch: refs/heads/master
Commit: ac0875de84085e1298575d0887e83e5deee5f418
Parents: c314e67
Author: Sela <an...@paypal.com>
Authored: Wed Jul 27 23:11:37 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 22:31:48 2016 -0700

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  6 ++++
 .../apache/beam/runners/spark/SparkRunner.java  | 14 +++------
 .../runners/spark/SparkRunnerRegistrar.java     |  6 ++--
 .../spark/SparkStreamingPipelineOptions.java    | 32 --------------------
 .../beam/runners/spark/TestSparkRunner.java     |  2 --
 .../apache/beam/runners/spark/DeDupTest.java    |  2 +-
 .../beam/runners/spark/EmptyInputTest.java      |  2 +-
 .../beam/runners/spark/SimpleWordCountTest.java |  4 +--
 .../runners/spark/SparkRunnerRegistrarTest.java |  2 +-
 .../apache/beam/runners/spark/TfIdfTest.java    |  2 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  2 +-
 .../beam/runners/spark/io/NumShardsTest.java    |  2 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  2 +-
 .../spark/translation/CombineGloballyTest.java  |  2 +-
 .../spark/translation/CombinePerKeyTest.java    |  2 +-
 .../spark/translation/DoFnOutputTest.java       |  6 ++--
 .../translation/MultiOutputWordCountTest.java   |  2 +-
 .../spark/translation/SerializationTest.java    |  2 +-
 .../spark/translation/SideEffectsTest.java      |  8 ++---
 .../streaming/FlattenStreamingTest.java         |  8 ++---
 .../streaming/KafkaStreamingTest.java           | 13 ++++----
 .../streaming/SimpleStreamingWordCountTest.java |  8 ++---
 .../streaming/utils/EmbeddedKafkaCluster.java   |  4 ++-
 23 files changed, 49 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 4bb2a57..6ef3741 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -33,4 +33,10 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
   @Default.String("local[1]")
   String getSparkMaster();
   void setSparkMaster(String master);
+
+  @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until "
+          + "execution is stopped")
+  @Default.Long(-1)
+  Long getTimeout();
+  void setTimeout(Long batchInterval);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index dfda987..d994ec4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -69,8 +69,6 @@ import org.slf4j.LoggerFactory;
  * options.setSparkMaster("spark://host:port");
  * EvaluationResult result = SparkRunner.create(options).run(p);
  * }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
  */
 public final class SparkRunner extends PipelineRunner<EvaluationResult> {
 
@@ -146,12 +144,6 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
   @Override
   public EvaluationResult run(Pipeline pipeline) {
     try {
-      // validate streaming configuration
-      if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) {
-        throw new RuntimeException("A streaming job must be configured with "
-            + SparkStreamingPipelineOptions.class.getSimpleName() + ", found "
-            + mOptions.getClass().getSimpleName());
-      }
       LOG.info("Executing pipeline using the SparkRunner.");
       JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
               .getSparkMaster(), mOptions.getAppName());
@@ -179,6 +171,9 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
 
         return ctxt;
       } else {
+        if (mOptions.getTimeout() > 0) {
+          LOG.info("Timeout is ignored by the SparkRunner in batch.");
+        }
         EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
         SparkPipelineTranslator translator = new TransformTranslator.Translator();
         pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
@@ -210,9 +205,8 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
   private EvaluationContext
       createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
       Duration batchDuration) {
-    SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
     JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
-    return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
+    return new StreamingEvaluationContext(jsc, pipeline, jssc, mOptions.getTimeout());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 2bed6a5..7a31753 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -49,15 +49,13 @@ public final class SparkRunnerRegistrar {
   }
 
   /**
-   * Registers the {@link SparkPipelineOptions} and {@link SparkStreamingPipelineOptions}.
+   * Registers the {@link SparkPipelineOptions}.
    */
   @AutoService(PipelineOptionsRegistrar.class)
   public static class Options implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(
-          SparkPipelineOptions.class,
-          SparkStreamingPipelineOptions.class);
+      return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
deleted file mode 100644
index 5944acd..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-
-/**
- * Options used to configure Spark streaming.
- */
-public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
-  @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until "
-          + "execution is stopped")
-  @Default.Long(-1)
-  Long getTimeout();
-  void setTimeout(Long batchInterval);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index e2b953d..50ed5f3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -46,8 +46,6 @@ import org.apache.beam.sdk.values.POutput;
  * options.setSparkMaster("spark://host:port");
  * EvaluationResult result = SparkRunner.create(options).run(p);
  * }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
  */
 public final class TestSparkRunner extends PipelineRunner<EvaluationResult> {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index dcf04a7..9a16744 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -56,7 +56,7 @@ public class DeDupTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
 
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index 7befec2..c2e331f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -49,7 +49,7 @@ public class EmptyInputTest {
     PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
 
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     assertEquals("", Iterables.getOnlyElement(res.get(output)));
     res.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 6f5ce5e..441d92d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -67,7 +67,7 @@ public class SimpleWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 
@@ -87,7 +87,7 @@ public class SimpleWordCountTest {
     File outputFile = testFolder.newFile();
     output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
 
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
 
     assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 236251b..3ca9df4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -41,7 +41,7 @@ public class SparkRunnerRegistrarTest {
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class, SparkStreamingPipelineOptions.class),
+        ImmutableList.of(SparkPipelineOptions.class),
         new SparkRunnerRegistrar.Options().getPipelineOptions());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index e4a293f..074e6aa 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -72,7 +72,7 @@ public class TfIdfTest {
 
     PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
 
-    EvaluationResult res = SparkRunner.create().run(pipeline);
+    EvaluationResult res = (EvaluationResult) pipeline.run();
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 787292e..d862424 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -81,7 +81,7 @@ public class AvroPipelineTest {
     PCollection<GenericRecord> input = p.apply(
         AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
 
     List<GenericRecord> records = readGenericFile();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 36d8b67..9c65917 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -79,7 +79,7 @@ public class NumShardsTest {
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
     output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
 
     int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 6d09503..01aa839 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -92,7 +92,7 @@ public class HadoopFileFormatPipelineTest {
     HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
         outputFormatClass, IntWritable.class, Text.class);
     input.apply(write.withoutSharding());
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
 
     IntWritable key = new IntWritable();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index 798f55a..e4ef7d7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -55,7 +55,7 @@ public class CombineGloballyTest {
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
 
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
         Iterables.getOnlyElement(res.get(output)));
     res.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 2e477e9..dee9213 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -57,7 +57,7 @@ public class CombinePerKeyTest {
         Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
         PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
-        EvaluationResult res = SparkRunner.create().run(p);
+        EvaluationResult res = (EvaluationResult) p.run();
         Map<String, Long> actualCnts = new HashMap<>();
         for (KV<String, Long> kv : res.get(cnts)) {
             actualCnts.put(kv.getKey(), kv.getValue());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 263ce99..e4b25bb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -41,9 +41,9 @@ public class DoFnOutputTest implements Serializable {
   public void test() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
+    Pipeline p = Pipeline.create(options);
 
-    PCollection<String> strings = pipeline.apply(Create.of("a"));
+    PCollection<String> strings = p.apply(Create.of("a"));
     // Test that values written from startBundle() and finishBundle() are written to
     // the output
     PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() {
@@ -63,7 +63,7 @@ public class DoFnOutputTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder("start", "a", "finish");
 
-    EvaluationResult res = SparkRunner.create().run(pipeline);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 739eec3..066521b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -84,7 +84,7 @@ public class MultiOutputWordCountTest {
     PCollection<Long> unique = luc.get(lowerCnts).apply(
         ApproximateUnique.<KV<String, Long>>globally(16));
 
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
         .containsInAnyOrder(EXPECTED_LOWER_COUNTS);
     Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 5e96c46..fb97b8b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -140,7 +140,7 @@ public class SerializationTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
-    EvaluationResult res = SparkRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 5775565..6cefa49 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -50,11 +50,11 @@ public class SideEffectsTest implements Serializable {
   public void test() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
+    Pipeline p = Pipeline.create(options);
 
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+    p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
 
-    pipeline.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() {
+    p.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
         throw new UserException();
@@ -62,7 +62,7 @@ public class SideEffectsTest implements Serializable {
     }));
 
     try {
-      pipeline.run();
+      p.run();
       fail("Run should thrown an exception");
     } catch (RuntimeException e) {
       assertNotNull(e.getCause());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index ed7e9b7..deb1b6a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.sdk.Pipeline;
@@ -57,8 +57,8 @@ public class FlattenStreamingTest {
 
   @Test
   public void testRun() throws Exception {
-    SparkStreamingPipelineOptions options =
-        PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+    SparkPipelineOptions options =
+        PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
     options.setStreaming(true);
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
@@ -77,7 +77,7 @@ public class FlattenStreamingTest {
 
     PAssertStreaming.assertContents(union, EXPECTED_UNION);
 
-    EvaluationResult res = SparkRunner.create(options).run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index c005f14..fa98ca3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
 import org.apache.beam.runners.spark.io.KafkaIO;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -56,10 +56,9 @@ import kafka.serializer.StringDecoder;
  */
 public class KafkaStreamingTest {
   private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
-          new EmbeddedKafkaCluster.EmbeddedZookeeper(17001);
+          new EmbeddedKafkaCluster.EmbeddedZookeeper();
   private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
-          new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(),
-                  new Properties(), Collections.singletonList(6667));
+          new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
   private static final String TOPIC = "kafka_dataflow_test_topic";
   private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
       "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
@@ -89,8 +88,8 @@ public class KafkaStreamingTest {
   @Test
   public void testRun() throws Exception {
     // test read from Kafka
-    SparkStreamingPipelineOptions options =
-        PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+    SparkPipelineOptions options =
+        PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
     options.setStreaming(true);
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
@@ -112,7 +111,7 @@ public class KafkaStreamingTest {
 
     PAssertStreaming.assertContents(formattedKV, EXPECTED);
 
-    EvaluationResult res = SparkRunner.create(options).run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 4fa03f6..5627056 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -54,8 +54,8 @@ public class SimpleStreamingWordCountTest implements Serializable {
 
   @Test
   public void testRun() throws Exception {
-    SparkStreamingPipelineOptions options =
-        PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+    SparkPipelineOptions options =
+        PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
     options.setStreaming(true);
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
@@ -70,7 +70,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
     PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
-    EvaluationResult res = SparkRunner.create(options).run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
index 0fec573..cd326ef 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
@@ -219,7 +219,9 @@ public class EmbeddedKafkaCluster {
 
 
     public void shutdown() {
-      factory.shutdown();
+      if (factory != null) {
+        factory.shutdown();
+      }
       try {
         TestUtils.deleteFile(snapshotDir);
       } catch (FileNotFoundException e) {