You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/14 17:11:21 UTC

[4/8] incubator-beam git commit: Update Pipeline Execution Style in WindowedWordCountTest

Update Pipeline Execution Style in WindowedWordCountTest

This sets the runner a Pipeline creation time rather than sending a
(potentially rewritten) pipeline to a new runner instance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de49d032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de49d032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de49d032

Branch: refs/heads/master
Commit: de49d032730dd21691e6e4358fdcfef249aef46f
Parents: 8291219
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:36:42 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700

----------------------------------------------------------------------
 .../spark/translation/WindowedWordCountTest.java | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de49d032/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index c6911e1..54af5e3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
@@ -55,7 +56,9 @@ public class WindowedWordCountTest {
 
   @Test
   public void testFixed() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords =
         p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
     PCollection<String> windowedWords =
@@ -65,7 +68,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 
@@ -74,7 +77,9 @@ public class WindowedWordCountTest {
 
   @Test
   public void testFixed2() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
         .withCoder(StringUtf8Coder.of()));
     PCollection<String> windowedWords = inputWords
@@ -84,7 +89,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 
@@ -94,7 +99,9 @@ public class WindowedWordCountTest {
 
   @Test
   public void testSliding() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
         .withCoder(StringUtf8Coder.of()));
     PCollection<String> windowedWords = inputWords
@@ -105,7 +112,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }