You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/21 18:15:07 UTC

[1/2] incubator-beam git commit: Rename SparkPipelineRunner to SparkRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 2b9906e8d -> 3001804e3


Rename SparkPipelineRunner to SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96ffc429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96ffc429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96ffc429

Branch: refs/heads/master
Commit: 96ffc42972010c9b027e826d6d610555ab0c055a
Parents: 2b9906e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 10:45:09 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 21 11:14:57 2016 -0700

----------------------------------------------------------------------
 README.md                                       |   2 +-
 runners/spark/README.md                         |   8 +-
 runners/spark/pom.xml                           |   2 +-
 .../beam/runners/spark/SparkPipelineRunner.java | 255 -------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 255 +++++++++++++++++++
 .../runners/spark/SparkRunnerRegistrar.java     |   8 +-
 .../runners/spark/TestSparkPipelineRunner.java  |  77 ------
 .../beam/runners/spark/TestSparkRunner.java     |  77 ++++++
 .../translation/SparkPipelineEvaluator.java     |   6 +-
 .../StreamingWindowPipelineDetector.java        |   6 +-
 .../apache/beam/runners/spark/DeDupTest.java    |   4 +-
 .../beam/runners/spark/EmptyInputTest.java      |   4 +-
 .../beam/runners/spark/SimpleWordCountTest.java |   8 +-
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../apache/beam/runners/spark/TfIdfTest.java    |   4 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   4 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   6 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   4 +-
 .../spark/translation/CombineGloballyTest.java  |   6 +-
 .../spark/translation/CombinePerKeyTest.java    |   4 +-
 .../spark/translation/DoFnOutputTest.java       |   6 +-
 .../translation/MultiOutputWordCountTest.java   |   4 +-
 .../spark/translation/SerializationTest.java    |   6 +-
 .../spark/translation/SideEffectsTest.java      |   4 +-
 .../translation/TransformTranslatorTest.java    |   6 +-
 .../translation/WindowedWordCountTest.java      |   8 +-
 .../streaming/FlattenStreamingTest.java         |   6 +-
 .../streaming/KafkaStreamingTest.java           |   6 +-
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 29 files changed, 397 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c4a9155..ec89c4d 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ Beam supports executing programs on multiple distributed processing backends thr
 - The `DirectRunner` runs the pipeline on your local machine.
 - The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
 - The `FlinkRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam.
-- The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam.
+- The `SparkRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam.
 
 Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse/BEAM/component/12328916/).
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/README.md
----------------------------------------------------------------------
diff --git a/runners/spark/README.md b/runners/spark/README.md
index 457f0a9..d2bfd3e 100644
--- a/runners/spark/README.md
+++ b/runners/spark/README.md
@@ -89,7 +89,7 @@ If we wanted to run a Beam pipeline with the default options of a single threade
 instance in local mode, we would do the following:
 
     Pipeline p = <logic for pipeline creation >
-    EvaluationResult result = SparkPipelineRunner.create().run(p);
+    EvaluationResult result = SparkRunner.create().run(p);
 
 To create a pipeline runner to run against a different Spark cluster, with a custom master url we
 would do the following:
@@ -97,7 +97,7 @@ would do the following:
     Pipeline p = <logic for pipeline creation >
     SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
     options.setSparkMaster("spark://host:port");
-    EvaluationResult result = SparkPipelineRunner.create(options).run(p);
+    EvaluationResult result = SparkRunner.create(options).run(p);
 
 ## Word Count Example
 
@@ -113,7 +113,7 @@ Then run the [word count example][wc] from the SDK using a single threaded Spark
 in local mode:
 
     mvn exec:exec -DmainClass=org.apache.beam.examples.WordCount \
-      -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \
+      -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkRunner \
       -DsparkMaster=local
 
 Check the output by running:
@@ -139,7 +139,7 @@ Then run the word count example using Spark submit with the `yarn-client` master
       --class org.apache.beam.examples.WordCount \
       --master yarn-client \
       target/spark-runner-*-spark-app.jar \
-        --inputFile=kinglear.txt --output=out --runner=SparkPipelineRunner --sparkMaster=yarn-client
+        --inputFile=kinglear.txt --output=out --runner=SparkRunner --sparkMaster=yarn-client
 
 Check the output by running:
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 747464e..741f2db 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -77,7 +77,7 @@
                     <systemPropertyVariables>
                       <beamTestPipelineOptions>
                         [
-                          "--runner=org.apache.beam.runners.spark.TestSparkPipelineRunner",
+                          "--runner=TestSparkRunner",
                           "--streaming=false"
                         ]
                       </beamTestPipelineOptions>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
deleted file mode 100644
index 90404dd..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.runners.spark.translation.EvaluationContext;
-import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
-import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
-import org.apache.beam.runners.spark.translation.SparkProcessContext;
-import org.apache.beam.runners.spark.translation.TransformTranslator;
-import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
-import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
-import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
-import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import org.apache.spark.SparkException;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The SparkPipelineRunner translate operations defined on a pipeline to a representation
- * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
- * a dataflow pipeline with the default options of a single threaded spark instance in local mode,
- * we would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = SparkPipelineRunner.create().run(p);
- * }
- *
- * To create a pipeline runner to run against a different spark cluster, with a custom master url
- * we would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
- * }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
- */
-public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);
-  /**
-   * Options used in this pipeline runner.
-   */
-  private final SparkPipelineOptions mOptions;
-
-  /**
-   * Creates and returns a new SparkPipelineRunner with default options. In particular, against a
-   * spark instance running in local mode.
-   *
-   * @return A pipeline runner with default options.
-   */
-  public static SparkPipelineRunner create() {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
-    return new SparkPipelineRunner(options);
-  }
-
-  /**
-   * Creates and returns a new SparkPipelineRunner with specified options.
-   *
-   * @param options The SparkPipelineOptions to use when executing the job.
-   * @return A pipeline runner that will execute with specified options.
-   */
-  public static SparkPipelineRunner create(SparkPipelineOptions options) {
-    return new SparkPipelineRunner(options);
-  }
-
-  /**
-   * Creates and returns a new SparkPipelineRunner with specified options.
-   *
-   * @param options The PipelineOptions to use when executing the job.
-   * @return A pipeline runner that will execute with specified options.
-   */
-  public static SparkPipelineRunner fromOptions(PipelineOptions options) {
-    SparkPipelineOptions sparkOptions =
-        PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
-    return new SparkPipelineRunner(sparkOptions);
-  }
-
-  /**
-   * Overrides for this runner.
-   */
-  @SuppressWarnings("rawtypes")
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-
-    if (transform instanceof GroupByKey) {
-      return (OutputT) ((PCollection) input).apply(
-          new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
-    } else if (transform instanceof Create.Values) {
-      return (OutputT) super.apply(
-        new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
-
-  /**
-   * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
-   * thread.
-   */
-  private SparkPipelineRunner(SparkPipelineOptions options) {
-    mOptions = options;
-  }
-
-
-  @Override
-  public EvaluationResult run(Pipeline pipeline) {
-    try {
-      // validate streaming configuration
-      if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) {
-        throw new RuntimeException("A streaming job must be configured with "
-            + SparkStreamingPipelineOptions.class.getSimpleName() + ", found "
-            + mOptions.getClass().getSimpleName());
-      }
-      LOG.info("Executing pipeline using the SparkPipelineRunner.");
-      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
-              .getSparkMaster(), mOptions.getAppName());
-
-      if (mOptions.isStreaming()) {
-        SparkPipelineTranslator translator =
-                new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
-        // if streaming - fixed window should be defined on all UNBOUNDED inputs
-        StreamingWindowPipelineDetector streamingWindowPipelineDetector =
-            new StreamingWindowPipelineDetector(translator);
-        pipeline.traverseTopologically(streamingWindowPipelineDetector);
-        if (!streamingWindowPipelineDetector.isWindowing()) {
-          throw new IllegalStateException("Spark streaming pipeline must be windowed!");
-        }
-
-        Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
-        LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
-        EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
-
-        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
-        ctxt.computeOutputs();
-
-        LOG.info("Streaming pipeline construction complete. Starting execution..");
-        ((StreamingEvaluationContext) ctxt).getStreamingContext().start();
-
-        return ctxt;
-      } else {
-        EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
-        SparkPipelineTranslator translator = new TransformTranslator.Translator();
-        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
-        ctxt.computeOutputs();
-
-        LOG.info("Pipeline execution complete.");
-
-        return ctxt;
-      }
-    } catch (Exception e) {
-      // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
-      // won't let you catch something that is not declared, so we can't catch
-      // SparkException here. Instead we do an instanceof check.
-      // Then we find the cause by seeing if it's a user exception (wrapped by our
-      // SparkProcessException), or just use the SparkException cause.
-      if (e instanceof SparkException && e.getCause() != null) {
-        if (e.getCause() instanceof SparkProcessContext.SparkProcessException
-            && e.getCause().getCause() != null) {
-          throw new RuntimeException(e.getCause().getCause());
-        } else {
-          throw new RuntimeException(e.getCause());
-        }
-      }
-      // otherwise just wrap in a RuntimeException
-      throw new RuntimeException(e);
-    }
-  }
-
-  private EvaluationContext
-      createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
-      Duration batchDuration) {
-    SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
-    JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
-    return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
-  }
-
-  /**
-   * Evaluator on the pipeline.
-   */
-  public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
-    protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
-
-    protected final SparkPipelineTranslator translator;
-
-    protected Evaluator(SparkPipelineTranslator translator) {
-      this.translator = translator;
-    }
-
-    @Override
-    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
-      if (node.getTransform() != null) {
-        @SuppressWarnings("unchecked")
-        Class<PTransform<?, ?>> transformClass =
-            (Class<PTransform<?, ?>>) node.getTransform().getClass();
-        if (translator.hasTranslation(transformClass)) {
-          LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
-          LOG.debug("Composite transform class: '{}'", transformClass);
-          doVisitTransform(node);
-          return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-        }
-      }
-      return CompositeBehavior.ENTER_TRANSFORM;
-    }
-
-    @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
-      doVisitTransform(node);
-    }
-
-    protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
-        doVisitTransform(TransformTreeNode node);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
new file mode 100644
index 0000000..dfda987
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
+import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.SparkProcessContext;
+import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
+import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
+import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The SparkRunner translate operations defined on a pipeline to a representation
+ * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
+ * a dataflow pipeline with the default options of a single threaded spark instance in local mode,
+ * we would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * EvaluationResult result = SparkRunner.create().run(p);
+ * }
+ *
+ * To create a pipeline runner to run against a different spark cluster, with a custom master url
+ * we would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ * options.setSparkMaster("spark://host:port");
+ * EvaluationResult result = SparkRunner.create(options).run(p);
+ * }
+ *
+ * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
+ */
+public final class SparkRunner extends PipelineRunner<EvaluationResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
+  /**
+   * Options used in this pipeline runner.
+   */
+  private final SparkPipelineOptions mOptions;
+
+  /**
+   * Creates and returns a new SparkRunner with default options. In particular, against a
+   * spark instance running in local mode.
+   *
+   * @return A pipeline runner with default options.
+   */
+  public static SparkRunner create() {
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    return new SparkRunner(options);
+  }
+
+  /**
+   * Creates and returns a new SparkRunner with specified options.
+   *
+   * @param options The SparkPipelineOptions to use when executing the job.
+   * @return A pipeline runner that will execute with specified options.
+   */
+  public static SparkRunner create(SparkPipelineOptions options) {
+    return new SparkRunner(options);
+  }
+
+  /**
+   * Creates and returns a new SparkRunner with specified options.
+   *
+   * @param options The PipelineOptions to use when executing the job.
+   * @return A pipeline runner that will execute with specified options.
+   */
+  public static SparkRunner fromOptions(PipelineOptions options) {
+    SparkPipelineOptions sparkOptions =
+        PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+    return new SparkRunner(sparkOptions);
+  }
+
+  /**
+   * Overrides for this runner.
+   */
+  @SuppressWarnings("rawtypes")
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+
+    if (transform instanceof GroupByKey) {
+      return (OutputT) ((PCollection) input).apply(
+          new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+    } else if (transform instanceof Create.Values) {
+      return (OutputT) super.apply(
+        new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
+    } else {
+      return super.apply(transform, input);
+    }
+  }
+
+
+  /**
+   * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
+   * thread.
+   */
+  private SparkRunner(SparkPipelineOptions options) {
+    mOptions = options;
+  }
+
+
+  @Override
+  public EvaluationResult run(Pipeline pipeline) {
+    try {
+      // validate streaming configuration
+      if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) {
+        throw new RuntimeException("A streaming job must be configured with "
+            + SparkStreamingPipelineOptions.class.getSimpleName() + ", found "
+            + mOptions.getClass().getSimpleName());
+      }
+      LOG.info("Executing pipeline using the SparkRunner.");
+      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
+              .getSparkMaster(), mOptions.getAppName());
+
+      if (mOptions.isStreaming()) {
+        SparkPipelineTranslator translator =
+                new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
+        // if streaming - fixed window should be defined on all UNBOUNDED inputs
+        StreamingWindowPipelineDetector streamingWindowPipelineDetector =
+            new StreamingWindowPipelineDetector(translator);
+        pipeline.traverseTopologically(streamingWindowPipelineDetector);
+        if (!streamingWindowPipelineDetector.isWindowing()) {
+          throw new IllegalStateException("Spark streaming pipeline must be windowed!");
+        }
+
+        Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
+        LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
+        EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
+
+        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
+        ctxt.computeOutputs();
+
+        LOG.info("Streaming pipeline construction complete. Starting execution..");
+        ((StreamingEvaluationContext) ctxt).getStreamingContext().start();
+
+        return ctxt;
+      } else {
+        EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
+        SparkPipelineTranslator translator = new TransformTranslator.Translator();
+        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
+        ctxt.computeOutputs();
+
+        LOG.info("Pipeline execution complete.");
+
+        return ctxt;
+      }
+    } catch (Exception e) {
+      // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
+      // won't let you catch something that is not declared, so we can't catch
+      // SparkException here. Instead we do an instanceof check.
+      // Then we find the cause by seeing if it's a user exception (wrapped by our
+      // SparkProcessException), or just use the SparkException cause.
+      if (e instanceof SparkException && e.getCause() != null) {
+        if (e.getCause() instanceof SparkProcessContext.SparkProcessException
+            && e.getCause().getCause() != null) {
+          throw new RuntimeException(e.getCause().getCause());
+        } else {
+          throw new RuntimeException(e.getCause());
+        }
+      }
+      // otherwise just wrap in a RuntimeException
+      throw new RuntimeException(e);
+    }
+  }
+
+  private EvaluationContext
+      createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+      Duration batchDuration) {
+    SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
+    JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
+    return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
+  }
+
+  /**
+   * Evaluator on the pipeline.
+   */
+  public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
+    protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
+
+    protected final SparkPipelineTranslator translator;
+
+    protected Evaluator(SparkPipelineTranslator translator) {
+      this.translator = translator;
+    }
+
+    @Override
+    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+      if (node.getTransform() != null) {
+        @SuppressWarnings("unchecked")
+        Class<PTransform<?, ?>> transformClass =
+            (Class<PTransform<?, ?>>) node.getTransform().getClass();
+        if (translator.hasTranslation(transformClass)) {
+          LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
+          LOG.debug("Composite transform class: '{}'", transformClass);
+          doVisitTransform(node);
+          return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+        }
+      }
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    @Override
+    public void visitPrimitiveTransform(TransformTreeNode node) {
+      doVisitTransform(node);
+    }
+
+    protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
+        doVisitTransform(TransformTreeNode node);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index baa2241..2bed6a5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList;
 
 /**
  * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link SparkPipelineRunner}.
+ * {@link SparkRunner}.
  *
  * {@link AutoService} will register Spark's implementations of the {@link PipelineRunner}
  * and {@link PipelineOptions} as available pipeline runner services.
@@ -37,14 +37,14 @@ public final class SparkRunnerRegistrar {
   private SparkRunnerRegistrar() {}
 
   /**
-   * Registers the {@link SparkPipelineRunner}.
+   * Registers the {@link SparkRunner}.
    */
   @AutoService(PipelineRunnerRegistrar.class)
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          SparkPipelineRunner.class, TestSparkPipelineRunner.class);
+      return ImmutableList
+          .<Class<? extends PipelineRunner<?>>>of(SparkRunner.class, TestSparkRunner.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
deleted file mode 100644
index d11d1c1..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-/**
- * The SparkPipelineRunner translate operations defined on a pipeline to a representation executable
- * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow
- * pipeline with the default options of a single threaded spark instance in local mode, we would do
- * the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = SparkPipelineRunner.create().run(p);
- * }
- *
- * To create a pipeline runner to run against a different spark cluster, with a custom master url we
- * would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
- * }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
- */
-public final class TestSparkPipelineRunner extends PipelineRunner<EvaluationResult> {
-
-  private SparkPipelineRunner delegate;
-
-  private TestSparkPipelineRunner(SparkPipelineOptions options) {
-    this.delegate = SparkPipelineRunner.fromOptions(options);
-  }
-
-  public static TestSparkPipelineRunner fromOptions(PipelineOptions options) {
-    // Default options suffice to set it up as a test runner
-    SparkPipelineOptions sparkOptions =
-        PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
-    return new TestSparkPipelineRunner(sparkOptions);
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput>
-      OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
-    return delegate.apply(transform, input);
-  };
-
-  @Override
-  public EvaluationResult run(Pipeline pipeline) {
-    return delegate.run(pipeline);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
new file mode 100644
index 0000000..e2b953d
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * The SparkRunner translate operations defined on a pipeline to a representation executable
+ * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow
+ * pipeline with the default options of a single threaded spark instance in local mode, we would do
+ * the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * EvaluationResult result = SparkRunner.create().run(p);
+ * }
+ *
+ * To create a pipeline runner to run against a different spark cluster, with a custom master url we
+ * would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ * options.setSparkMaster("spark://host:port");
+ * EvaluationResult result = SparkRunner.create(options).run(p);
+ * }
+ *
+ * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
+ */
+public final class TestSparkRunner extends PipelineRunner<EvaluationResult> {
+
+  private SparkRunner delegate;
+
+  private TestSparkRunner(SparkPipelineOptions options) {
+    this.delegate = SparkRunner.fromOptions(options);
+  }
+
+  public static TestSparkRunner fromOptions(PipelineOptions options) {
+    // Default options suffice to set it up as a test runner
+    SparkPipelineOptions sparkOptions =
+        PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+    return new TestSparkRunner(sparkOptions);
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput>
+      OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
+    return delegate.apply(transform, input);
+  };
+
+  @Override
+  public EvaluationResult run(Pipeline pipeline) {
+    return delegate.run(pipeline);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
index 609c413..02e8b3d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -26,9 +26,9 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
 /**
- * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark.
+ * Pipeline {@link SparkRunner.Evaluator} for Spark.
  */
-public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator {
+public final class SparkPipelineEvaluator extends SparkRunner.Evaluator {
 
   private final EvaluationContext ctxt;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
index f6f3029..394b2c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.spark.translation.streaming;
 
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -35,9 +35,9 @@ import org.apache.spark.streaming.Durations;
 
 
 /**
- * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
+ * Pipeline {@link SparkRunner.Evaluator} to detect windowing.
  */
-public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
+public final class StreamingWindowPipelineDetector extends SparkRunner.Evaluator {
 
   // Currently, Spark streaming recommends batches no smaller then 500 msec
   private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index 285a2d6..dcf04a7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -49,14 +49,14 @@ public class DeDupTest {
   @Test
   public void testRun() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     res.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index f227e94..7befec2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -43,13 +43,13 @@ public class EmptyInputTest {
   @Test
   public void test() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
     List<String> empty = Collections.emptyList();
     PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     assertEquals("", Iterables.getOnlyElement(res.get(output)));
     res.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 2b4464d..da3fa7a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -64,7 +64,7 @@ public class SimpleWordCountTest {
   @Test
   public void testInMem() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
         .of()));
@@ -72,7 +72,7 @@ public class SimpleWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     res.close();
   }
 
@@ -82,7 +82,7 @@ public class SimpleWordCountTest {
   @Test
   public void testOutputFile() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
         .of()));
@@ -92,7 +92,7 @@ public class SimpleWordCountTest {
     output.apply(
         TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     res.close();
 
     assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index d2e57aa..236251b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -47,7 +47,7 @@ public class SparkRunnerRegistrarTest {
 
   @Test
   public void testRunners() {
-    assertEquals(ImmutableList.of(SparkPipelineRunner.class, TestSparkPipelineRunner.class),
+    assertEquals(ImmutableList.of(SparkRunner.class, TestSparkRunner.class),
         new SparkRunnerRegistrar.Runner().getPipelineRunners());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 00c4657..df78338 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -42,7 +42,7 @@ public class TfIdfTest {
   @Test
   public void testTfIdf() throws Exception {
     SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    opts.setRunner(SparkPipelineRunner.class);
+    opts.setRunner(SparkRunner.class);
     Pipeline pipeline = Pipeline.create(opts);
 
     pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
@@ -60,7 +60,7 @@ public class TfIdfTest {
 
     PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
 
-    EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
+    EvaluationResult res = SparkRunner.create().run(pipeline);
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index f358878..4cce03d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.io;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -78,7 +78,7 @@ public class AvroPipelineTest {
     PCollection<GenericRecord> input = p.apply(
         AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     res.close();
 
     List<GenericRecord> records = readGenericFile();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 23d4592..b4268d6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.beam.examples.WordCount;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
@@ -73,13 +73,13 @@ public class NumShardsTest {
   @Test
   public void testText() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
     output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     res.close();
 
     int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index eaa508c..4d1658f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.io.hadoop;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.coders.WritableCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -89,7 +89,7 @@ public class HadoopFileFormatPipelineTest {
     HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
         outputFormatClass, IntWritable.class, Text.class);
     input.apply(write.withoutSharding());
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     res.close();
 
     IntWritable key = new IntWritable();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index 9a3edd3..798f55a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -50,12 +50,12 @@ public class CombineGloballyTest {
   @Test
   public void test() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
         Iterables.getOnlyElement(res.get(output)));
     res.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index face526..65c6870 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -19,7 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -54,7 +54,7 @@ public class CombinePerKeyTest {
         Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
         PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
-        EvaluationResult res = SparkPipelineRunner.create().run(p);
+        EvaluationResult res = SparkRunner.create().run(p);
         Map<String, Long> actualCnts = new HashMap<>();
         for (KV<String, Long> kv : res.get(cnts)) {
             actualCnts.put(kv.getKey(), kv.getValue());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 0334bfe..0f60271 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -40,7 +40,7 @@ public class DoFnOutputTest implements Serializable {
   @Test
   public void test() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline pipeline = Pipeline.create(options);
 
     PCollection<String> strings = pipeline.apply(Create.of("a"));
@@ -63,7 +63,7 @@ public class DoFnOutputTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder("start", "a", "finish");
 
-    EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
+    EvaluationResult res = SparkRunner.create().run(pipeline);
     res.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 3402bb4..787691d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -19,7 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -81,7 +81,7 @@ public class MultiOutputWordCountTest {
     PCollection<Long> unique = luc.get(lowerCnts).apply(
         ApproximateUnique.<KV<String, Long>>globally(16));
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
         .containsInAnyOrder(EXPECTED_LOWER_COUNTS);
     Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index de3c152..5b9eeff 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -132,7 +132,7 @@ public class SerializationTest {
   @Test
   public void testRun() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<StringHolder> inputWords =
         p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of()));
@@ -140,7 +140,7 @@ public class SerializationTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = SparkRunner.create().run(p);
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 5674900..60b7f71 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,7 +49,7 @@ public class SideEffectsTest implements Serializable {
   @Test
   public void test() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     Pipeline pipeline = Pipeline.create(options);
 
     pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index b593316..aca36dc 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -53,13 +53,13 @@ public class TransformTranslatorTest {
 
   /**
    * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
-   * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark
+   * in DirectRunner and on SparkRunner, with the mapped dataflow-to-spark
    * transforms. Finally it makes sure that the results are the same for both runs.
    */
   @Test
   public void testTextIOReadAndWriteTransforms() throws IOException {
     String directOut = runPipeline(DirectRunner.class);
-    String sparkOut = runPipeline(SparkPipelineRunner.class);
+    String sparkOut = runPipeline(SparkRunner.class);
 
     List<String> directOutput =
         Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 54af5e3..043d506 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SimpleWordCountTest;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -57,7 +57,7 @@ public class WindowedWordCountTest {
   @Test
   public void testFixed() throws Exception {
     PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(SparkPipelineRunner.class);
+    opts.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords =
         p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
@@ -78,7 +78,7 @@ public class WindowedWordCountTest {
   @Test
   public void testFixed2() throws Exception {
     PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(SparkPipelineRunner.class);
+    opts.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
         .withCoder(StringUtf8Coder.of()));
@@ -100,7 +100,7 @@ public class WindowedWordCountTest {
   @Test
   public void testSliding() throws Exception {
     PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(SparkPipelineRunner.class);
+    opts.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
         .withCoder(StringUtf8Coder.of()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 976c7c2..160f21d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -60,7 +60,7 @@ public class FlattenStreamingTest {
     SparkStreamingPipelineOptions options =
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
@@ -77,7 +77,7 @@ public class FlattenStreamingTest {
 
     PAssertStreaming.assertContents(union, EXPECTED_UNION);
 
-    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    EvaluationResult res = SparkRunner.create(options).run(p);
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 53293fb..5578e35 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
 import org.apache.beam.runners.spark.io.KafkaIO;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
@@ -92,7 +92,7 @@ public class KafkaStreamingTest {
     SparkStreamingPipelineOptions options =
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
@@ -112,7 +112,7 @@ public class KafkaStreamingTest {
 
     PAssertStreaming.assertContents(formattedKV, EXPECTED);
 
-    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    EvaluationResult res = SparkRunner.create(options).run(p);
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 6dc9a08..75a702b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SimpleWordCountTest;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -56,7 +56,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
     SparkStreamingPipelineOptions options =
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
-    options.setRunner(SparkPipelineRunner.class);
+    options.setRunner(SparkRunner.class);
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
@@ -68,7 +68,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
     PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
 
     PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
-    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    EvaluationResult res = SparkRunner.create(options).run(p);
     res.close();
   }
 }


[2/2] incubator-beam git commit: Closes #488

Posted by dh...@apache.org.
Closes #488


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3001804e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3001804e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3001804e

Branch: refs/heads/master
Commit: 3001804e3cb33d2d399f64e3c88308e136321782
Parents: 2b9906e 96ffc42
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jun 21 11:14:58 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 21 11:14:58 2016 -0700

----------------------------------------------------------------------
 README.md                                       |   2 +-
 runners/spark/README.md                         |   8 +-
 runners/spark/pom.xml                           |   2 +-
 .../beam/runners/spark/SparkPipelineRunner.java | 255 -------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 255 +++++++++++++++++++
 .../runners/spark/SparkRunnerRegistrar.java     |   8 +-
 .../runners/spark/TestSparkPipelineRunner.java  |  77 ------
 .../beam/runners/spark/TestSparkRunner.java     |  77 ++++++
 .../translation/SparkPipelineEvaluator.java     |   6 +-
 .../StreamingWindowPipelineDetector.java        |   6 +-
 .../apache/beam/runners/spark/DeDupTest.java    |   4 +-
 .../beam/runners/spark/EmptyInputTest.java      |   4 +-
 .../beam/runners/spark/SimpleWordCountTest.java |   8 +-
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../apache/beam/runners/spark/TfIdfTest.java    |   4 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   4 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   6 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   4 +-
 .../spark/translation/CombineGloballyTest.java  |   6 +-
 .../spark/translation/CombinePerKeyTest.java    |   4 +-
 .../spark/translation/DoFnOutputTest.java       |   6 +-
 .../translation/MultiOutputWordCountTest.java   |   4 +-
 .../spark/translation/SerializationTest.java    |   6 +-
 .../spark/translation/SideEffectsTest.java      |   4 +-
 .../translation/TransformTranslatorTest.java    |   6 +-
 .../translation/WindowedWordCountTest.java      |   8 +-
 .../streaming/FlattenStreamingTest.java         |   6 +-
 .../streaming/KafkaStreamingTest.java           |   6 +-
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 29 files changed, 397 insertions(+), 397 deletions(-)
----------------------------------------------------------------------