You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:39 UTC

[05/50] [abbrv] beam git commit: Separates side input test and side output test

Separates side input test and side output test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a51bdd26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a51bdd26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a51bdd26

Branch: refs/heads/DSL_SQL
Commit: a51bdd266f9c877cb407de986a465fc9c7de76ff
Parents: a9bcc8b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 15 16:38:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/SplittableDoFnTest.java | 63 ++++++++++++++------
 1 file changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a51bdd26/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 9e8c12e..30329f4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -61,7 +62,7 @@ import org.junit.runners.JUnit4;
  * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior.
  */
 @RunWith(JUnit4.class)
-public class SplittableDoFnTest {
+public class SplittableDoFnTest implements Serializable {
 
   static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
     @ProcessElement
@@ -216,22 +217,18 @@ public class SplittableDoFnTest {
     p.run();
   }
 
-  private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
+  private static class SDFWithSideInput extends DoFn<Integer, String> {
     private final PCollectionView<String> sideInput;
-    private final TupleTag<String> additionalOutput;
 
-    private SDFWithSideInputsAndOutputs(
-        PCollectionView<String> sideInput, TupleTag<String> additionalOutput) {
+    private SDFWithSideInput(PCollectionView<String> sideInput) {
       this.sideInput = sideInput;
-      this.additionalOutput = additionalOutput;
     }
 
     @ProcessElement
     public void process(ProcessContext c, OffsetRangeTracker tracker) {
       checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
       String side = c.sideInput(sideInput);
-      c.output("main:" + side + ":" + c.element());
-      c.output(additionalOutput, "additional:" + side + ":" + c.element());
+      c.output(side + ":" + c.element());
     }
 
     @GetInitialRestriction
@@ -242,27 +239,55 @@ public class SplittableDoFnTest {
 
   @Test
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})
-  public void testSideInputsAndOutputs() throws Exception {
-
+  public void testSideInput() throws Exception {
     PCollectionView<String> sideInput =
         p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
-    TupleTag<String> mainOutputTag = new TupleTag<>("main");
-    TupleTag<String> additionalOutputTag = new TupleTag<>("additional");
+
+    PCollection<String> res =
+        p.apply("input", Create.of(0, 1, 2))
+            .apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput));
+
+    PAssert.that(res).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2"));
+
+    p.run();
+  }
+
+  private static class SDFWithAdditionalOutput extends DoFn<Integer, String> {
+    private final TupleTag<String> additionalOutput;
+
+    private SDFWithAdditionalOutput(TupleTag<String> additionalOutput) {
+      this.additionalOutput = additionalOutput;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
+      c.output("main:" + c.element());
+      c.output(additionalOutput, "additional:" + c.element());
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(Integer value) {
+      return new OffsetRange(0, 1);
+    }
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
+  public void testAdditionalOutput() throws Exception {
+    TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
+    TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {};
 
     PCollectionTuple res =
         p.apply("input", Create.of(0, 1, 2))
             .apply(
-                ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag))
-                    .withSideInputs(sideInput)
+                ParDo.of(new SDFWithAdditionalOutput(additionalOutputTag))
                     .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
-    res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
-    res.get(additionalOutputTag).setCoder(StringUtf8Coder.of());
 
     PAssert.that(res.get(mainOutputTag))
-        .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
+        .containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
     PAssert.that(res.get(additionalOutputTag))
-        .containsInAnyOrder(
-            Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2"));
+        .containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));
 
     p.run();
   }