You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 17:40:31 UTC

[2/4] incubator-beam git commit: Add DirectRunner Reuse Test

Add DirectRunner Reuse Test

Two calls to run using the Direct Runner should be independent and
succeed independently.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4546fd9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4546fd9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4546fd9c

Branch: refs/heads/master
Commit: 4546fd9c5e073eb33787faa302b8695dfd6e04aa
Parents: 7585cfc
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:58:59 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4546fd9c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 29dea32..1e73ec0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -109,6 +109,37 @@ public class DirectRunnerTest implements Serializable {
     result.awaitCompletion();
   }
 
+  @Test
+  public void reusePipelineSucceeds() throws Throwable {
+    Pipeline p = getPipeline();
+
+    PCollection<KV<String, Long>> counts =
+        p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+            .apply(MapElements.via(new SimpleFunction<String, String>() {
+              @Override
+              public String apply(String input) {
+                return input;
+              }
+            }))
+            .apply(Count.<String>perElement());
+    PCollection<String> countStrs =
+        counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+          @Override
+          public String apply(KV<String, Long> input) {
+            String str = String.format("%s: %s", input.getKey(), input.getValue());
+            return str;
+          }
+        }));
+
+    PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+    DirectPipelineResult result = ((DirectPipelineResult) p.run());
+    result.awaitCompletion();
+
+    DirectPipelineResult otherResult = ((DirectPipelineResult) p.run());
+    otherResult.awaitCompletion();
+  }
+
   @Test(timeout = 5000L)
   public void byteArrayCountShouldSucceed() {
     Pipeline p = getPipeline();