You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/21 18:15:07 UTC
[1/2] incubator-beam git commit: Rename SparkPipelineRunner to
SparkRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master 2b9906e8d -> 3001804e3
Rename SparkPipelineRunner to SparkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96ffc429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96ffc429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96ffc429
Branch: refs/heads/master
Commit: 96ffc42972010c9b027e826d6d610555ab0c055a
Parents: 2b9906e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 10:45:09 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 21 11:14:57 2016 -0700
----------------------------------------------------------------------
README.md | 2 +-
runners/spark/README.md | 8 +-
runners/spark/pom.xml | 2 +-
.../beam/runners/spark/SparkPipelineRunner.java | 255 -------------------
.../apache/beam/runners/spark/SparkRunner.java | 255 +++++++++++++++++++
.../runners/spark/SparkRunnerRegistrar.java | 8 +-
.../runners/spark/TestSparkPipelineRunner.java | 77 ------
.../beam/runners/spark/TestSparkRunner.java | 77 ++++++
.../translation/SparkPipelineEvaluator.java | 6 +-
.../StreamingWindowPipelineDetector.java | 6 +-
.../apache/beam/runners/spark/DeDupTest.java | 4 +-
.../beam/runners/spark/EmptyInputTest.java | 4 +-
.../beam/runners/spark/SimpleWordCountTest.java | 8 +-
.../runners/spark/SparkRunnerRegistrarTest.java | 2 +-
.../apache/beam/runners/spark/TfIdfTest.java | 4 +-
.../beam/runners/spark/io/AvroPipelineTest.java | 4 +-
.../beam/runners/spark/io/NumShardsTest.java | 6 +-
.../io/hadoop/HadoopFileFormatPipelineTest.java | 4 +-
.../spark/translation/CombineGloballyTest.java | 6 +-
.../spark/translation/CombinePerKeyTest.java | 4 +-
.../spark/translation/DoFnOutputTest.java | 6 +-
.../translation/MultiOutputWordCountTest.java | 4 +-
.../spark/translation/SerializationTest.java | 6 +-
.../spark/translation/SideEffectsTest.java | 4 +-
.../translation/TransformTranslatorTest.java | 6 +-
.../translation/WindowedWordCountTest.java | 8 +-
.../streaming/FlattenStreamingTest.java | 6 +-
.../streaming/KafkaStreamingTest.java | 6 +-
.../streaming/SimpleStreamingWordCountTest.java | 6 +-
29 files changed, 397 insertions(+), 397 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c4a9155..ec89c4d 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ Beam supports executing programs on multiple distributed processing backends thr
- The `DirectRunner` runs the pipeline on your local machine.
- The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
- The `FlinkRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam.
-- The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam.
+- The `SparkRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam.
Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse/BEAM/component/12328916/).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/README.md
----------------------------------------------------------------------
diff --git a/runners/spark/README.md b/runners/spark/README.md
index 457f0a9..d2bfd3e 100644
--- a/runners/spark/README.md
+++ b/runners/spark/README.md
@@ -89,7 +89,7 @@ If we wanted to run a Beam pipeline with the default options of a single threade
instance in local mode, we would do the following:
Pipeline p = <logic for pipeline creation >
- EvaluationResult result = SparkPipelineRunner.create().run(p);
+ EvaluationResult result = SparkRunner.create().run(p);
To create a pipeline runner to run against a different Spark cluster, with a custom master url we
would do the following:
@@ -97,7 +97,7 @@ would do the following:
Pipeline p = <logic for pipeline creation >
SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
options.setSparkMaster("spark://host:port");
- EvaluationResult result = SparkPipelineRunner.create(options).run(p);
+ EvaluationResult result = SparkRunner.create(options).run(p);
## Word Count Example
@@ -113,7 +113,7 @@ Then run the [word count example][wc] from the SDK using a single threaded Spark
in local mode:
mvn exec:exec -DmainClass=org.apache.beam.examples.WordCount \
- -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \
+ -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkRunner \
-DsparkMaster=local
Check the output by running:
@@ -139,7 +139,7 @@ Then run the word count example using Spark submit with the `yarn-client` master
--class org.apache.beam.examples.WordCount \
--master yarn-client \
target/spark-runner-*-spark-app.jar \
- --inputFile=kinglear.txt --output=out --runner=SparkPipelineRunner --sparkMaster=yarn-client
+ --inputFile=kinglear.txt --output=out --runner=SparkRunner --sparkMaster=yarn-client
Check the output by running:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 747464e..741f2db 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -77,7 +77,7 @@
<systemPropertyVariables>
<beamTestPipelineOptions>
[
- "--runner=org.apache.beam.runners.spark.TestSparkPipelineRunner",
+ "--runner=TestSparkRunner",
"--streaming=false"
]
</beamTestPipelineOptions>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
deleted file mode 100644
index 90404dd..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.runners.spark.translation.EvaluationContext;
-import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
-import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
-import org.apache.beam.runners.spark.translation.SparkProcessContext;
-import org.apache.beam.runners.spark.translation.TransformTranslator;
-import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
-import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
-import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
-import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import org.apache.spark.SparkException;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The SparkPipelineRunner translate operations defined on a pipeline to a representation
- * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
- * a dataflow pipeline with the default options of a single threaded spark instance in local mode,
- * we would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = SparkPipelineRunner.create().run(p);
- * }
- *
- * To create a pipeline runner to run against a different spark cluster, with a custom master url
- * we would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
- * }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
- */
-public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> {
-
- private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);
- /**
- * Options used in this pipeline runner.
- */
- private final SparkPipelineOptions mOptions;
-
- /**
- * Creates and returns a new SparkPipelineRunner with default options. In particular, against a
- * spark instance running in local mode.
- *
- * @return A pipeline runner with default options.
- */
- public static SparkPipelineRunner create() {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
- return new SparkPipelineRunner(options);
- }
-
- /**
- * Creates and returns a new SparkPipelineRunner with specified options.
- *
- * @param options The SparkPipelineOptions to use when executing the job.
- * @return A pipeline runner that will execute with specified options.
- */
- public static SparkPipelineRunner create(SparkPipelineOptions options) {
- return new SparkPipelineRunner(options);
- }
-
- /**
- * Creates and returns a new SparkPipelineRunner with specified options.
- *
- * @param options The PipelineOptions to use when executing the job.
- * @return A pipeline runner that will execute with specified options.
- */
- public static SparkPipelineRunner fromOptions(PipelineOptions options) {
- SparkPipelineOptions sparkOptions =
- PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
- return new SparkPipelineRunner(sparkOptions);
- }
-
- /**
- * Overrides for this runner.
- */
- @SuppressWarnings("rawtypes")
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
-
- if (transform instanceof GroupByKey) {
- return (OutputT) ((PCollection) input).apply(
- new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
- } else if (transform instanceof Create.Values) {
- return (OutputT) super.apply(
- new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
- } else {
- return super.apply(transform, input);
- }
- }
-
-
- /**
- * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
- * thread.
- */
- private SparkPipelineRunner(SparkPipelineOptions options) {
- mOptions = options;
- }
-
-
- @Override
- public EvaluationResult run(Pipeline pipeline) {
- try {
- // validate streaming configuration
- if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) {
- throw new RuntimeException("A streaming job must be configured with "
- + SparkStreamingPipelineOptions.class.getSimpleName() + ", found "
- + mOptions.getClass().getSimpleName());
- }
- LOG.info("Executing pipeline using the SparkPipelineRunner.");
- JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
- .getSparkMaster(), mOptions.getAppName());
-
- if (mOptions.isStreaming()) {
- SparkPipelineTranslator translator =
- new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
- // if streaming - fixed window should be defined on all UNBOUNDED inputs
- StreamingWindowPipelineDetector streamingWindowPipelineDetector =
- new StreamingWindowPipelineDetector(translator);
- pipeline.traverseTopologically(streamingWindowPipelineDetector);
- if (!streamingWindowPipelineDetector.isWindowing()) {
- throw new IllegalStateException("Spark streaming pipeline must be windowed!");
- }
-
- Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
- LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
- EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
-
- pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
- ctxt.computeOutputs();
-
- LOG.info("Streaming pipeline construction complete. Starting execution..");
- ((StreamingEvaluationContext) ctxt).getStreamingContext().start();
-
- return ctxt;
- } else {
- EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
- SparkPipelineTranslator translator = new TransformTranslator.Translator();
- pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
- ctxt.computeOutputs();
-
- LOG.info("Pipeline execution complete.");
-
- return ctxt;
- }
- } catch (Exception e) {
- // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
- // won't let you catch something that is not declared, so we can't catch
- // SparkException here. Instead we do an instanceof check.
- // Then we find the cause by seeing if it's a user exception (wrapped by our
- // SparkProcessException), or just use the SparkException cause.
- if (e instanceof SparkException && e.getCause() != null) {
- if (e.getCause() instanceof SparkProcessContext.SparkProcessException
- && e.getCause().getCause() != null) {
- throw new RuntimeException(e.getCause().getCause());
- } else {
- throw new RuntimeException(e.getCause());
- }
- }
- // otherwise just wrap in a RuntimeException
- throw new RuntimeException(e);
- }
- }
-
- private EvaluationContext
- createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
- Duration batchDuration) {
- SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
- JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
- return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
- }
-
- /**
- * Evaluator on the pipeline.
- */
- public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
- protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
-
- protected final SparkPipelineTranslator translator;
-
- protected Evaluator(SparkPipelineTranslator translator) {
- this.translator = translator;
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
- if (node.getTransform() != null) {
- @SuppressWarnings("unchecked")
- Class<PTransform<?, ?>> transformClass =
- (Class<PTransform<?, ?>>) node.getTransform().getClass();
- if (translator.hasTranslation(transformClass)) {
- LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
- LOG.debug("Composite transform class: '{}'", transformClass);
- doVisitTransform(node);
- return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
- }
- }
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void visitPrimitiveTransform(TransformTreeNode node) {
- doVisitTransform(node);
- }
-
- protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
- doVisitTransform(TransformTreeNode node);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
new file mode 100644
index 0000000..dfda987
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
+import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.SparkProcessContext;
+import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
+import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
+import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The SparkRunner translate operations defined on a pipeline to a representation
+ * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
+ * a dataflow pipeline with the default options of a single threaded spark instance in local mode,
+ * we would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * EvaluationResult result = SparkRunner.create().run(p);
+ * }
+ *
+ * To create a pipeline runner to run against a different spark cluster, with a custom master url
+ * we would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ * options.setSparkMaster("spark://host:port");
+ * EvaluationResult result = SparkRunner.create(options).run(p);
+ * }
+ *
+ * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
+ */
+public final class SparkRunner extends PipelineRunner<EvaluationResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
+ /**
+ * Options used in this pipeline runner.
+ */
+ private final SparkPipelineOptions mOptions;
+
+ /**
+ * Creates and returns a new SparkRunner with default options. In particular, against a
+ * spark instance running in local mode.
+ *
+ * @return A pipeline runner with default options.
+ */
+ public static SparkRunner create() {
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ return new SparkRunner(options);
+ }
+
+ /**
+ * Creates and returns a new SparkRunner with specified options.
+ *
+ * @param options The SparkPipelineOptions to use when executing the job.
+ * @return A pipeline runner that will execute with specified options.
+ */
+ public static SparkRunner create(SparkPipelineOptions options) {
+ return new SparkRunner(options);
+ }
+
+ /**
+ * Creates and returns a new SparkRunner with specified options.
+ *
+ * @param options The PipelineOptions to use when executing the job.
+ * @return A pipeline runner that will execute with specified options.
+ */
+ public static SparkRunner fromOptions(PipelineOptions options) {
+ SparkPipelineOptions sparkOptions =
+ PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+ return new SparkRunner(sparkOptions);
+ }
+
+ /**
+ * Overrides for this runner.
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+
+ if (transform instanceof GroupByKey) {
+ return (OutputT) ((PCollection) input).apply(
+ new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+ } else if (transform instanceof Create.Values) {
+ return (OutputT) super.apply(
+ new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
+ } else {
+ return super.apply(transform, input);
+ }
+ }
+
+
+ /**
+ * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
+ * thread.
+ */
+ private SparkRunner(SparkPipelineOptions options) {
+ mOptions = options;
+ }
+
+
+ @Override
+ public EvaluationResult run(Pipeline pipeline) {
+ try {
+ // validate streaming configuration
+ if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) {
+ throw new RuntimeException("A streaming job must be configured with "
+ + SparkStreamingPipelineOptions.class.getSimpleName() + ", found "
+ + mOptions.getClass().getSimpleName());
+ }
+ LOG.info("Executing pipeline using the SparkRunner.");
+ JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
+ .getSparkMaster(), mOptions.getAppName());
+
+ if (mOptions.isStreaming()) {
+ SparkPipelineTranslator translator =
+ new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
+ // if streaming - fixed window should be defined on all UNBOUNDED inputs
+ StreamingWindowPipelineDetector streamingWindowPipelineDetector =
+ new StreamingWindowPipelineDetector(translator);
+ pipeline.traverseTopologically(streamingWindowPipelineDetector);
+ if (!streamingWindowPipelineDetector.isWindowing()) {
+ throw new IllegalStateException("Spark streaming pipeline must be windowed!");
+ }
+
+ Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
+ LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
+ EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
+
+ pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
+ ctxt.computeOutputs();
+
+ LOG.info("Streaming pipeline construction complete. Starting execution..");
+ ((StreamingEvaluationContext) ctxt).getStreamingContext().start();
+
+ return ctxt;
+ } else {
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
+ SparkPipelineTranslator translator = new TransformTranslator.Translator();
+ pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
+ ctxt.computeOutputs();
+
+ LOG.info("Pipeline execution complete.");
+
+ return ctxt;
+ }
+ } catch (Exception e) {
+ // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
+ // won't let you catch something that is not declared, so we can't catch
+ // SparkException here. Instead we do an instanceof check.
+ // Then we find the cause by seeing if it's a user exception (wrapped by our
+ // SparkProcessException), or just use the SparkException cause.
+ if (e instanceof SparkException && e.getCause() != null) {
+ if (e.getCause() instanceof SparkProcessContext.SparkProcessException
+ && e.getCause().getCause() != null) {
+ throw new RuntimeException(e.getCause().getCause());
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ // otherwise just wrap in a RuntimeException
+ throw new RuntimeException(e);
+ }
+ }
+
+ private EvaluationContext
+ createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+ Duration batchDuration) {
+ SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
+ JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
+ return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
+ }
+
+ /**
+ * Evaluator on the pipeline.
+ */
+ public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
+ protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
+
+ protected final SparkPipelineTranslator translator;
+
+ protected Evaluator(SparkPipelineTranslator translator) {
+ this.translator = translator;
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ if (node.getTransform() != null) {
+ @SuppressWarnings("unchecked")
+ Class<PTransform<?, ?>> transformClass =
+ (Class<PTransform<?, ?>>) node.getTransform().getClass();
+ if (translator.hasTranslation(transformClass)) {
+ LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
+ LOG.debug("Composite transform class: '{}'", transformClass);
+ doVisitTransform(node);
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ doVisitTransform(node);
+ }
+
+ protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
+ doVisitTransform(TransformTreeNode node);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index baa2241..2bed6a5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList;
/**
* Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link SparkPipelineRunner}.
+ * {@link SparkRunner}.
*
* {@link AutoService} will register Spark's implementations of the {@link PipelineRunner}
* and {@link PipelineOptions} as available pipeline runner services.
@@ -37,14 +37,14 @@ public final class SparkRunnerRegistrar {
private SparkRunnerRegistrar() {}
/**
- * Registers the {@link SparkPipelineRunner}.
+ * Registers the {@link SparkRunner}.
*/
@AutoService(PipelineRunnerRegistrar.class)
public static class Runner implements PipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
- SparkPipelineRunner.class, TestSparkPipelineRunner.class);
+ return ImmutableList
+ .<Class<? extends PipelineRunner<?>>>of(SparkRunner.class, TestSparkRunner.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
deleted file mode 100644
index d11d1c1..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-/**
- * The SparkPipelineRunner translate operations defined on a pipeline to a representation executable
- * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow
- * pipeline with the default options of a single threaded spark instance in local mode, we would do
- * the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = SparkPipelineRunner.create().run(p);
- * }
- *
- * To create a pipeline runner to run against a different spark cluster, with a custom master url we
- * would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
- * }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
- */
-public final class TestSparkPipelineRunner extends PipelineRunner<EvaluationResult> {
-
- private SparkPipelineRunner delegate;
-
- private TestSparkPipelineRunner(SparkPipelineOptions options) {
- this.delegate = SparkPipelineRunner.fromOptions(options);
- }
-
- public static TestSparkPipelineRunner fromOptions(PipelineOptions options) {
- // Default options suffice to set it up as a test runner
- SparkPipelineOptions sparkOptions =
- PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
- return new TestSparkPipelineRunner(sparkOptions);
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput>
- OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
- return delegate.apply(transform, input);
- };
-
- @Override
- public EvaluationResult run(Pipeline pipeline) {
- return delegate.run(pipeline);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
new file mode 100644
index 0000000..e2b953d
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * The SparkRunner translate operations defined on a pipeline to a representation executable
+ * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow
+ * pipeline with the default options of a single threaded spark instance in local mode, we would do
+ * the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * EvaluationResult result = SparkRunner.create().run(p);
+ * }
+ *
+ * To create a pipeline runner to run against a different spark cluster, with a custom master url we
+ * would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ * options.setSparkMaster("spark://host:port");
+ * EvaluationResult result = SparkRunner.create(options).run(p);
+ * }
+ *
+ * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
+ */
+public final class TestSparkRunner extends PipelineRunner<EvaluationResult> {
+
+ private SparkRunner delegate;
+
+ private TestSparkRunner(SparkPipelineOptions options) {
+ this.delegate = SparkRunner.fromOptions(options);
+ }
+
+ public static TestSparkRunner fromOptions(PipelineOptions options) {
+ // Default options suffice to set it up as a test runner
+ SparkPipelineOptions sparkOptions =
+ PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+ return new TestSparkRunner(sparkOptions);
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput>
+ OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
+ return delegate.apply(transform, input);
+ };
+
+ @Override
+ public EvaluationResult run(Pipeline pipeline) {
+ return delegate.run(pipeline);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
index 609c413..02e8b3d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.spark.translation;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -26,9 +26,9 @@ import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
/**
- * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark.
+ * Pipeline {@link SparkRunner.Evaluator} for Spark.
*/
-public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator {
+public final class SparkPipelineEvaluator extends SparkRunner.Evaluator {
private final EvaluationContext ctxt;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
index f6f3029..394b2c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.spark.translation.streaming;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -35,9 +35,9 @@ import org.apache.spark.streaming.Durations;
/**
- * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
+ * Pipeline {@link SparkRunner.Evaluator} to detect windowing.
*/
-public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
+public final class StreamingWindowPipelineDetector extends SparkRunner.Evaluator {
// Currently, Spark streaming recommends batches no smaller then 500 msec
private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index 285a2d6..dcf04a7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -49,14 +49,14 @@ public class DeDupTest {
@Test
public void testRun() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of()));
PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
res.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index f227e94..7befec2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -43,13 +43,13 @@ public class EmptyInputTest {
@Test
public void test() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
List<String> empty = Collections.emptyList();
PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
assertEquals("", Iterables.getOnlyElement(res.get(output)));
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 2b4464d..da3fa7a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -64,7 +64,7 @@ public class SimpleWordCountTest {
@Test
public void testInMem() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
@@ -72,7 +72,7 @@ public class SimpleWordCountTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
res.close();
}
@@ -82,7 +82,7 @@ public class SimpleWordCountTest {
@Test
public void testOutputFile() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
@@ -92,7 +92,7 @@ public class SimpleWordCountTest {
output.apply(
TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
res.close();
assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index d2e57aa..236251b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -47,7 +47,7 @@ public class SparkRunnerRegistrarTest {
@Test
public void testRunners() {
- assertEquals(ImmutableList.of(SparkPipelineRunner.class, TestSparkPipelineRunner.class),
+ assertEquals(ImmutableList.of(SparkRunner.class, TestSparkRunner.class),
new SparkRunnerRegistrar.Runner().getPipelineRunners());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 00c4657..df78338 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -42,7 +42,7 @@ public class TfIdfTest {
@Test
public void testTfIdf() throws Exception {
SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- opts.setRunner(SparkPipelineRunner.class);
+ opts.setRunner(SparkRunner.class);
Pipeline pipeline = Pipeline.create(opts);
pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
@@ -60,7 +60,7 @@ public class TfIdfTest {
PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
- EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
+ EvaluationResult res = SparkRunner.create().run(pipeline);
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index f358878..4cce03d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.io;
import static org.junit.Assert.assertEquals;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -78,7 +78,7 @@ public class AvroPipelineTest {
PCollection<GenericRecord> input = p.apply(
AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
res.close();
List<GenericRecord> records = readGenericFile();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 23d4592..b4268d6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.examples.WordCount;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
@@ -73,13 +73,13 @@ public class NumShardsTest {
@Test
public void testText() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
res.close();
int count = 0;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index eaa508c..4d1658f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.io.hadoop;
import static org.junit.Assert.assertEquals;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.coders.WritableCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
@@ -89,7 +89,7 @@ public class HadoopFileFormatPipelineTest {
HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
outputFormatClass, IntWritable.class, Text.class);
input.apply(write.withoutSharding());
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
res.close();
IntWritable key = new IntWritable();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index 9a3edd3..798f55a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -50,12 +50,12 @@ public class CombineGloballyTest {
@Test
public void test() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
Iterables.getOnlyElement(res.get(output)));
res.close();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index face526..65c6870 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -19,7 +19,7 @@
package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -54,7 +54,7 @@ public class CombinePerKeyTest {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
Map<String, Long> actualCnts = new HashMap<>();
for (KV<String, Long> kv : res.get(cnts)) {
actualCnts.put(kv.getKey(), kv.getValue());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 0334bfe..0f60271 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -40,7 +40,7 @@ public class DoFnOutputTest implements Serializable {
@Test
public void test() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> strings = pipeline.apply(Create.of("a"));
@@ -63,7 +63,7 @@ public class DoFnOutputTest implements Serializable {
PAssert.that(output).containsInAnyOrder("start", "a", "finish");
- EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
+ EvaluationResult res = SparkRunner.create().run(pipeline);
res.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 3402bb4..787691d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -19,7 +19,7 @@
package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -81,7 +81,7 @@ public class MultiOutputWordCountTest {
PCollection<Long> unique = luc.get(lowerCnts).apply(
ApproximateUnique.<KV<String, Long>>globally(16));
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
.containsInAnyOrder(EXPECTED_LOWER_COUNTS);
Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index de3c152..5b9eeff 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -132,7 +132,7 @@ public class SerializationTest {
@Test
public void testRun() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<StringHolder> inputWords =
p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of()));
@@ -140,7 +140,7 @@ public class SerializationTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = SparkRunner.create().run(p);
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 5674900..60b7f71 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,7 +49,7 @@ public class SideEffectsTest implements Serializable {
@Test
public void test() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index b593316..aca36dc 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -53,13 +53,13 @@ public class TransformTranslatorTest {
/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
- * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark
+ * in DirectRunner and on SparkRunner, with the mapped dataflow-to-spark
* transforms. Finally it makes sure that the results are the same for both runs.
*/
@Test
public void testTextIOReadAndWriteTransforms() throws IOException {
String directOut = runPipeline(DirectRunner.class);
- String sparkOut = runPipeline(SparkPipelineRunner.class);
+ String sparkOut = runPipeline(SparkRunner.class);
List<String> directOutput =
Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 54af5e3..043d506 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SimpleWordCountTest;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -57,7 +57,7 @@ public class WindowedWordCountTest {
@Test
public void testFixed() throws Exception {
PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(SparkPipelineRunner.class);
+ opts.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(opts);
PCollection<String> inputWords =
p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
@@ -78,7 +78,7 @@ public class WindowedWordCountTest {
@Test
public void testFixed2() throws Exception {
PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(SparkPipelineRunner.class);
+ opts.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(opts);
PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
.withCoder(StringUtf8Coder.of()));
@@ -100,7 +100,7 @@ public class WindowedWordCountTest {
@Test
public void testSliding() throws Exception {
PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(SparkPipelineRunner.class);
+ opts.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(opts);
PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
.withCoder(StringUtf8Coder.of()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 976c7c2..160f21d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.spark.translation.streaming;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -60,7 +60,7 @@ public class FlattenStreamingTest {
SparkStreamingPipelineOptions options =
PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
options.setAppName(this.getClass().getSimpleName());
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
@@ -77,7 +77,7 @@ public class FlattenStreamingTest {
PAssertStreaming.assertContents(union, EXPECTED_UNION);
- EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+ EvaluationResult res = SparkRunner.create(options).run(p);
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 53293fb..5578e35 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.spark.translation.streaming;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.io.KafkaIO;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
@@ -92,7 +92,7 @@ public class KafkaStreamingTest {
SparkStreamingPipelineOptions options =
PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
options.setAppName(this.getClass().getSimpleName());
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
@@ -112,7 +112,7 @@ public class KafkaStreamingTest {
PAssertStreaming.assertContents(formattedKV, EXPECTED);
- EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+ EvaluationResult res = SparkRunner.create(options).run(p);
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 6dc9a08..75a702b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SimpleWordCountTest;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -56,7 +56,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
SparkStreamingPipelineOptions options =
PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
options.setAppName(this.getClass().getSimpleName());
- options.setRunner(SparkPipelineRunner.class);
+ options.setRunner(SparkRunner.class);
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
@@ -68,7 +68,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
- EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+ EvaluationResult res = SparkRunner.create(options).run(p);
res.close();
}
}
[2/2] incubator-beam git commit: Closes #488
Posted by dh...@apache.org.
Closes #488
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3001804e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3001804e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3001804e
Branch: refs/heads/master
Commit: 3001804e3cb33d2d399f64e3c88308e136321782
Parents: 2b9906e 96ffc42
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jun 21 11:14:58 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 21 11:14:58 2016 -0700
----------------------------------------------------------------------
README.md | 2 +-
runners/spark/README.md | 8 +-
runners/spark/pom.xml | 2 +-
.../beam/runners/spark/SparkPipelineRunner.java | 255 -------------------
.../apache/beam/runners/spark/SparkRunner.java | 255 +++++++++++++++++++
.../runners/spark/SparkRunnerRegistrar.java | 8 +-
.../runners/spark/TestSparkPipelineRunner.java | 77 ------
.../beam/runners/spark/TestSparkRunner.java | 77 ++++++
.../translation/SparkPipelineEvaluator.java | 6 +-
.../StreamingWindowPipelineDetector.java | 6 +-
.../apache/beam/runners/spark/DeDupTest.java | 4 +-
.../beam/runners/spark/EmptyInputTest.java | 4 +-
.../beam/runners/spark/SimpleWordCountTest.java | 8 +-
.../runners/spark/SparkRunnerRegistrarTest.java | 2 +-
.../apache/beam/runners/spark/TfIdfTest.java | 4 +-
.../beam/runners/spark/io/AvroPipelineTest.java | 4 +-
.../beam/runners/spark/io/NumShardsTest.java | 6 +-
.../io/hadoop/HadoopFileFormatPipelineTest.java | 4 +-
.../spark/translation/CombineGloballyTest.java | 6 +-
.../spark/translation/CombinePerKeyTest.java | 4 +-
.../spark/translation/DoFnOutputTest.java | 6 +-
.../translation/MultiOutputWordCountTest.java | 4 +-
.../spark/translation/SerializationTest.java | 6 +-
.../spark/translation/SideEffectsTest.java | 4 +-
.../translation/TransformTranslatorTest.java | 6 +-
.../translation/WindowedWordCountTest.java | 8 +-
.../streaming/FlattenStreamingTest.java | 6 +-
.../streaming/KafkaStreamingTest.java | 6 +-
.../streaming/SimpleStreamingWordCountTest.java | 6 +-
29 files changed, 397 insertions(+), 397 deletions(-)
----------------------------------------------------------------------