You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 05:32:23 UTC
incubator-beam git commit: SparkRunner calls pipeline.run
Repository: incubator-beam
Updated Branches:
refs/heads/master c314e670e -> ac0875de8
SparkRunner calls pipeline.run
* Remove SparkStreamingPipelineOptions.
* Run pipeline with Pipeline.run().
* Better EmbeddedKafka.
* Avoid NPE if factory wasn't created.
* Let EmbeddedKafka/Zookeeper discover ports on their own.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ac0875de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ac0875de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ac0875de
Branch: refs/heads/master
Commit: ac0875de84085e1298575d0887e83e5deee5f418
Parents: c314e67
Author: Sela <an...@paypal.com>
Authored: Wed Jul 27 23:11:37 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 22:31:48 2016 -0700
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 6 ++++
.../apache/beam/runners/spark/SparkRunner.java | 14 +++------
.../runners/spark/SparkRunnerRegistrar.java | 6 ++--
.../spark/SparkStreamingPipelineOptions.java | 32 --------------------
.../beam/runners/spark/TestSparkRunner.java | 2 --
.../apache/beam/runners/spark/DeDupTest.java | 2 +-
.../beam/runners/spark/EmptyInputTest.java | 2 +-
.../beam/runners/spark/SimpleWordCountTest.java | 4 +--
.../runners/spark/SparkRunnerRegistrarTest.java | 2 +-
.../apache/beam/runners/spark/TfIdfTest.java | 2 +-
.../beam/runners/spark/io/AvroPipelineTest.java | 2 +-
.../beam/runners/spark/io/NumShardsTest.java | 2 +-
.../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +-
.../spark/translation/CombineGloballyTest.java | 2 +-
.../spark/translation/CombinePerKeyTest.java | 2 +-
.../spark/translation/DoFnOutputTest.java | 6 ++--
.../translation/MultiOutputWordCountTest.java | 2 +-
.../spark/translation/SerializationTest.java | 2 +-
.../spark/translation/SideEffectsTest.java | 8 ++---
.../streaming/FlattenStreamingTest.java | 8 ++---
.../streaming/KafkaStreamingTest.java | 13 ++++----
.../streaming/SimpleStreamingWordCountTest.java | 8 ++---
.../streaming/utils/EmbeddedKafkaCluster.java | 4 ++-
23 files changed, 49 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 4bb2a57..6ef3741 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -33,4 +33,10 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
@Default.String("local[1]")
String getSparkMaster();
void setSparkMaster(String master);
+
+ @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until "
+ + "execution is stopped")
+ @Default.Long(-1)
+ Long getTimeout();
+ void setTimeout(Long batchInterval);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index dfda987..d994ec4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -69,8 +69,6 @@ import org.slf4j.LoggerFactory;
* options.setSparkMaster("spark://host:port");
* EvaluationResult result = SparkRunner.create(options).run(p);
* }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
*/
public final class SparkRunner extends PipelineRunner<EvaluationResult> {
@@ -146,12 +144,6 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
@Override
public EvaluationResult run(Pipeline pipeline) {
try {
- // validate streaming configuration
- if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) {
- throw new RuntimeException("A streaming job must be configured with "
- + SparkStreamingPipelineOptions.class.getSimpleName() + ", found "
- + mOptions.getClass().getSimpleName());
- }
LOG.info("Executing pipeline using the SparkRunner.");
JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
.getSparkMaster(), mOptions.getAppName());
@@ -179,6 +171,9 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
return ctxt;
} else {
+ if (mOptions.getTimeout() > 0) {
+ LOG.info("Timeout is ignored by the SparkRunner in batch.");
+ }
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
SparkPipelineTranslator translator = new TransformTranslator.Translator();
pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
@@ -210,9 +205,8 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
private EvaluationContext
createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
Duration batchDuration) {
- SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
- return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
+ return new StreamingEvaluationContext(jsc, pipeline, jssc, mOptions.getTimeout());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 2bed6a5..7a31753 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -49,15 +49,13 @@ public final class SparkRunnerRegistrar {
}
/**
- * Registers the {@link SparkPipelineOptions} and {@link SparkStreamingPipelineOptions}.
+ * Registers the {@link SparkPipelineOptions}.
*/
@AutoService(PipelineOptionsRegistrar.class)
public static class Options implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(
- SparkPipelineOptions.class,
- SparkStreamingPipelineOptions.class);
+ return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
deleted file mode 100644
index 5944acd..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-
-/**
- * Options used to configure Spark streaming.
- */
-public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
- @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until "
- + "execution is stopped")
- @Default.Long(-1)
- Long getTimeout();
- void setTimeout(Long batchInterval);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index e2b953d..50ed5f3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -46,8 +46,6 @@ import org.apache.beam.sdk.values.POutput;
* options.setSparkMaster("spark://host:port");
* EvaluationResult result = SparkRunner.create(options).run(p);
* }
- *
- * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
*/
public final class TestSparkRunner extends PipelineRunner<EvaluationResult> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index dcf04a7..9a16744 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -56,7 +56,7 @@ public class DeDupTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index 7befec2..c2e331f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -49,7 +49,7 @@ public class EmptyInputTest {
PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
assertEquals("", Iterables.getOnlyElement(res.get(output)));
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 6f5ce5e..441d92d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -67,7 +67,7 @@ public class SimpleWordCountTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
@@ -87,7 +87,7 @@ public class SimpleWordCountTest {
File outputFile = testFolder.newFile();
output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 236251b..3ca9df4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -41,7 +41,7 @@ public class SparkRunnerRegistrarTest {
@Test
public void testOptions() {
assertEquals(
- ImmutableList.of(SparkPipelineOptions.class, SparkStreamingPipelineOptions.class),
+ ImmutableList.of(SparkPipelineOptions.class),
new SparkRunnerRegistrar.Options().getPipelineOptions());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index e4a293f..074e6aa 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -72,7 +72,7 @@ public class TfIdfTest {
PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
- EvaluationResult res = SparkRunner.create().run(pipeline);
+ EvaluationResult res = (EvaluationResult) pipeline.run();
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 787292e..d862424 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -81,7 +81,7 @@ public class AvroPipelineTest {
PCollection<GenericRecord> input = p.apply(
AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
List<GenericRecord> records = readGenericFile();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 36d8b67..9c65917 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -79,7 +79,7 @@ public class NumShardsTest {
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
int count = 0;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 6d09503..01aa839 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -92,7 +92,7 @@ public class HadoopFileFormatPipelineTest {
HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
outputFormatClass, IntWritable.class, Text.class);
input.apply(write.withoutSharding());
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
IntWritable key = new IntWritable();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index 798f55a..e4ef7d7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -55,7 +55,7 @@ public class CombineGloballyTest {
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
Iterables.getOnlyElement(res.get(output)));
res.close();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 2e477e9..dee9213 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -57,7 +57,7 @@ public class CombinePerKeyTest {
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
Map<String, Long> actualCnts = new HashMap<>();
for (KV<String, Long> kv : res.get(cnts)) {
actualCnts.put(kv.getKey(), kv.getValue());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 263ce99..e4b25bb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -41,9 +41,9 @@ public class DoFnOutputTest implements Serializable {
public void test() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
- Pipeline pipeline = Pipeline.create(options);
+ Pipeline p = Pipeline.create(options);
- PCollection<String> strings = pipeline.apply(Create.of("a"));
+ PCollection<String> strings = p.apply(Create.of("a"));
// Test that values written from startBundle() and finishBundle() are written to
// the output
PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() {
@@ -63,7 +63,7 @@ public class DoFnOutputTest implements Serializable {
PAssert.that(output).containsInAnyOrder("start", "a", "finish");
- EvaluationResult res = SparkRunner.create().run(pipeline);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 739eec3..066521b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -84,7 +84,7 @@ public class MultiOutputWordCountTest {
PCollection<Long> unique = luc.get(lowerCnts).apply(
ApproximateUnique.<KV<String, Long>>globally(16));
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
.containsInAnyOrder(EXPECTED_LOWER_COUNTS);
Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 5e96c46..fb97b8b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -140,7 +140,7 @@ public class SerializationTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
- EvaluationResult res = SparkRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 5775565..6cefa49 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -50,11 +50,11 @@ public class SideEffectsTest implements Serializable {
public void test() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
- Pipeline pipeline = Pipeline.create(options);
+ Pipeline p = Pipeline.create(options);
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+ p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
- pipeline.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() {
+ p.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
throw new UserException();
@@ -62,7 +62,7 @@ public class SideEffectsTest implements Serializable {
}));
try {
- pipeline.run();
+ p.run();
fail("Run should thrown an exception");
} catch (RuntimeException e) {
assertNotNull(e.getCause());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index ed7e9b7..deb1b6a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.spark.translation.streaming;
import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
import org.apache.beam.sdk.Pipeline;
@@ -57,8 +57,8 @@ public class FlattenStreamingTest {
@Test
public void testRun() throws Exception {
- SparkStreamingPipelineOptions options =
- PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+ SparkPipelineOptions options =
+ PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setStreaming(true);
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
@@ -77,7 +77,7 @@ public class FlattenStreamingTest {
PAssertStreaming.assertContents(union, EXPECTED_UNION);
- EvaluationResult res = SparkRunner.create(options).run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index c005f14..fa98ca3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.spark.translation.streaming;
import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.io.KafkaIO;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -56,10 +56,9 @@ import kafka.serializer.StringDecoder;
*/
public class KafkaStreamingTest {
private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
- new EmbeddedKafkaCluster.EmbeddedZookeeper(17001);
+ new EmbeddedKafkaCluster.EmbeddedZookeeper();
private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
- new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(),
- new Properties(), Collections.singletonList(6667));
+ new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
private static final String TOPIC = "kafka_dataflow_test_topic";
private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
"k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
@@ -89,8 +88,8 @@ public class KafkaStreamingTest {
@Test
public void testRun() throws Exception {
// test read from Kafka
- SparkStreamingPipelineOptions options =
- PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+ SparkPipelineOptions options =
+ PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setStreaming(true);
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
@@ -112,7 +111,7 @@ public class KafkaStreamingTest {
PAssertStreaming.assertContents(formattedKV, EXPECTED);
- EvaluationResult res = SparkRunner.create(options).run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 4fa03f6..5627056 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.spark.translation.streaming;
import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -54,8 +54,8 @@ public class SimpleStreamingWordCountTest implements Serializable {
@Test
public void testRun() throws Exception {
- SparkStreamingPipelineOptions options =
- PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+ SparkPipelineOptions options =
+ PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setStreaming(true);
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
@@ -70,7 +70,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
- EvaluationResult res = SparkRunner.create(options).run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
index 0fec573..cd326ef 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
@@ -219,7 +219,9 @@ public class EmbeddedKafkaCluster {
public void shutdown() {
- factory.shutdown();
+ if (factory != null) {
+ factory.shutdown();
+ }
try {
TestUtils.deleteFile(snapshotDir);
} catch (FileNotFoundException e) {