You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 17:40:31 UTC
[2/4] incubator-beam git commit: Add DirectRunner Reuse Test
Add DirectRunner Reuse Test
Two calls to run using the Direct Runner should be independent and
succeed independently.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4546fd9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4546fd9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4546fd9c
Branch: refs/heads/master
Commit: 4546fd9c5e073eb33787faa302b8695dfd6e04aa
Parents: 7585cfc
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:58:59 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunnerTest.java | 31 ++++++++++++++++++++
1 file changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4546fd9c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 29dea32..1e73ec0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -109,6 +109,37 @@ public class DirectRunnerTest implements Serializable {
result.awaitCompletion();
}
+ @Test
+ public void reusePipelineSucceeds() throws Throwable {
+ Pipeline p = getPipeline();
+
+ PCollection<KV<String, Long>> counts =
+ p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+ .apply(MapElements.via(new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input;
+ }
+ }))
+ .apply(Count.<String>perElement());
+ PCollection<String> countStrs =
+ counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+ @Override
+ public String apply(KV<String, Long> input) {
+ String str = String.format("%s: %s", input.getKey(), input.getValue());
+ return str;
+ }
+ }));
+
+ PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+ DirectPipelineResult result = ((DirectPipelineResult) p.run());
+ result.awaitCompletion();
+
+ DirectPipelineResult otherResult = ((DirectPipelineResult) p.run());
+ otherResult.awaitCompletion();
+ }
+
@Test(timeout = 5000L)
public void byteArrayCountShouldSucceed() {
Pipeline p = getPipeline();