You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/10 23:20:26 UTC

[1/2] incubator-beam git commit: Demonstrate that the DirectRunner runs per-call

Repository: incubator-beam
Updated Branches:
  refs/heads/master 6da92ad94 -> 2a1055dd8


Demonstrate that the DirectRunner runs per-call

Add a field that is modified per output, which should occur twice.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/65f9076d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/65f9076d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/65f9076d

Branch: refs/heads/master
Commit: 65f9076d654be02cbdc07442d008f6c5245d1ab5
Parents: 6da92ad
Author: Thomas Groh <tg...@google.com>
Authored: Wed Aug 10 11:29:38 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 10 15:54:09 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectRunnerTest.java | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/65f9076d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 1e73ec0..ddce458 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -58,6 +60,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Tests for basic {@link DirectRunner} functionality.
@@ -109,10 +112,12 @@ public class DirectRunnerTest implements Serializable {
     result.awaitCompletion();
   }
 
+  private static AtomicInteger changed;
   @Test
   public void reusePipelineSucceeds() throws Throwable {
     Pipeline p = getPipeline();
 
+    changed = new AtomicInteger(0);
     PCollection<KV<String, Long>> counts =
         p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
             .apply(MapElements.via(new SimpleFunction<String, String>() {
@@ -131,6 +136,14 @@ public class DirectRunnerTest implements Serializable {
           }
         }));
 
+    counts.apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {
+      @ProcessElement
+      public void updateChanged(ProcessContext c) {
+        changed.getAndIncrement();
+      }
+    }));
+
+
     PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
 
     DirectPipelineResult result = ((DirectPipelineResult) p.run());
@@ -138,6 +151,8 @@ public class DirectRunnerTest implements Serializable {
 
     DirectPipelineResult otherResult = ((DirectPipelineResult) p.run());
     otherResult.awaitCompletion();
+
+    assertThat("Each element should have been processed twice", changed.get(), equalTo(6));
   }
 
   @Test(timeout = 5000L)


[2/2] incubator-beam git commit: This closes #811

Posted by bc...@apache.org.
This closes #811


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a1055dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a1055dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a1055dd

Branch: refs/heads/master
Commit: 2a1055dd8e669ee9181bda3d944e377f7e33b5ea
Parents: 6da92ad 65f9076
Author: bchambers <bc...@google.com>
Authored: Wed Aug 10 15:54:36 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 10 15:54:36 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectRunnerTest.java | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------