You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/10 23:20:26 UTC
[1/2] incubator-beam git commit: Demonstrate that the DirectRunner
runs per-call
Repository: incubator-beam
Updated Branches:
refs/heads/master 6da92ad94 -> 2a1055dd8
Demonstrate that the DirectRunner runs per-call
Add a field that is modified per output, which should occur twice.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/65f9076d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/65f9076d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/65f9076d
Branch: refs/heads/master
Commit: 65f9076d654be02cbdc07442d008f6c5245d1ab5
Parents: 6da92ad
Author: Thomas Groh <tg...@google.com>
Authored: Wed Aug 10 11:29:38 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 10 15:54:09 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/DirectRunnerTest.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/65f9076d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 1e73ec0..ddce458 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.OldDoFn;
@@ -58,6 +60,7 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Tests for basic {@link DirectRunner} functionality.
@@ -109,10 +112,12 @@ public class DirectRunnerTest implements Serializable {
result.awaitCompletion();
}
+ private static AtomicInteger changed;
@Test
public void reusePipelineSucceeds() throws Throwable {
Pipeline p = getPipeline();
+ changed = new AtomicInteger(0);
PCollection<KV<String, Long>> counts =
p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
.apply(MapElements.via(new SimpleFunction<String, String>() {
@@ -131,6 +136,14 @@ public class DirectRunnerTest implements Serializable {
}
}));
+ counts.apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {
+ @ProcessElement
+ public void updateChanged(ProcessContext c) {
+ changed.getAndIncrement();
+ }
+ }));
+
+
PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
DirectPipelineResult result = ((DirectPipelineResult) p.run());
@@ -138,6 +151,8 @@ public class DirectRunnerTest implements Serializable {
DirectPipelineResult otherResult = ((DirectPipelineResult) p.run());
otherResult.awaitCompletion();
+
+ assertThat("Each element should have been processed twice", changed.get(), equalTo(6));
}
@Test(timeout = 5000L)
[2/2] incubator-beam git commit: This closes #811
Posted by bc...@apache.org.
This closes #811
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a1055dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a1055dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a1055dd
Branch: refs/heads/master
Commit: 2a1055dd8e669ee9181bda3d944e377f7e33b5ea
Parents: 6da92ad 65f9076
Author: bchambers <bc...@google.com>
Authored: Wed Aug 10 15:54:36 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 10 15:54:36 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/DirectRunnerTest.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------