You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/16 20:24:10 UTC

[1/2] beam git commit: Tests for reading windowed side input from resumed SDF call

Repository: beam
Updated Branches:
  refs/heads/master aa555f593 -> e827642ef


Tests for reading windowed side input from resumed SDF call


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9a6a277c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9a6a277c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9a6a277c

Branch: refs/heads/master
Commit: 9a6a277cea4582f0a64eac97730cb85af5ba352b
Parents: aa555f5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jun 8 16:54:12 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jun 16 13:09:50 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/SplittableDoFnTest.java | 145 ++++++++++++++++++-
 1 file changed, 140 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9a6a277c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 02a44d2..646d8d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,10 +18,13 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.Ordering;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,6 +32,7 @@ import java.util.List;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
@@ -182,6 +186,12 @@ public class SplittableDoFnTest implements Serializable {
   private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> {
     private static final int MAX_INDEX = 98765;
 
+    private final TupleTag<Integer> numProcessCalls;
+
+    private SDFWithMultipleOutputsPerBlock(TupleTag<Integer> numProcessCalls) {
+      this.numProcessCalls = numProcessCalls;
+    }
+
     private static int snapToNextBlock(int index, int[] blockStarts) {
       for (int i = 1; i < blockStarts.length; ++i) {
         if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
@@ -195,6 +205,7 @@ public class SplittableDoFnTest implements Serializable {
     public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
+      c.output(numProcessCalls, 1);
       for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
         for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
           c.output(index);
@@ -211,10 +222,26 @@ public class SplittableDoFnTest implements Serializable {
   @Test
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testOutputAfterCheckpoint() throws Exception {
-    PCollection<Integer> outputs = p.apply(Create.of("foo"))
-        .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
-    PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
+    TupleTag<Integer> main = new TupleTag<>();
+    TupleTag<Integer> numProcessCalls = new TupleTag<>();
+    PCollectionTuple outputs =
+        p.apply(Create.of("foo"))
+            .apply(
+                ParDo.of(new SDFWithMultipleOutputsPerBlock(numProcessCalls))
+                    .withOutputTags(main, TupleTagList.of(numProcessCalls)));
+    PAssert.thatSingleton(outputs.get(main).apply(Count.<Integer>globally()))
         .isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX);
+    // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
+    PAssert.thatSingleton(
+            outputs.get(numProcessCalls).setCoder(VarIntCoder.of()).apply(Sum.integersGlobally()))
+        .satisfies(
+            new SerializableFunction<Integer, Void>() {
+              @Override
+              public Void apply(Integer input) {
+                assertThat(input, greaterThan(1));
+                return null;
+              }
+            });
     p.run();
   }
 
@@ -287,9 +314,117 @@ public class SplittableDoFnTest implements Serializable {
     PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
 
     p.run();
+  }
+
+  @BoundedPerElement
+  private static class SDFWithMultipleOutputsPerBlockAndSideInput
+      extends DoFn<Integer, KV<String, Integer>> {
+    private static final int MAX_INDEX = 98765;
+    private final PCollectionView<String> sideInput;
+    private final TupleTag<Integer> numProcessCalls;
+
+    public SDFWithMultipleOutputsPerBlockAndSideInput(
+        PCollectionView<String> sideInput, TupleTag<Integer> numProcessCalls) {
+      this.sideInput = sideInput;
+      this.numProcessCalls = numProcessCalls;
+    }
+
+    private static int snapToNextBlock(int index, int[] blockStarts) {
+      for (int i = 1; i < blockStarts.length; ++i) {
+        if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
+          return i;
+        }
+      }
+      throw new IllegalStateException("Shouldn't get here");
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+      int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
+      int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
+      c.output(numProcessCalls, 1);
+      for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
+        for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
+          c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
+        }
+      }
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(Integer element) {
+      return new OffsetRange(0, MAX_INDEX);
+    }
+  }
+
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    UsesSplittableParDo.class,
+    UsesSplittableParDoWithWindowedSideInputs.class
+  })
+  public void testWindowedSideInputWithCheckpoints() throws Exception {
+    PCollection<Integer> mainInput =
+        p.apply("main",
+                Create.timestamped(
+                    TimestampedValue.of(0, new Instant(0)),
+                    TimestampedValue.of(1, new Instant(1)),
+                    TimestampedValue.of(2, new Instant(2)),
+                    TimestampedValue.of(3, new Instant(3))))
+            .apply("window 1", Window.<Integer>into(FixedWindows.of(Duration.millis(1))));
+
+    PCollectionView<String> sideInput =
+        p.apply("side",
+                Create.timestamped(
+                    TimestampedValue.of("a", new Instant(0)),
+                    TimestampedValue.of("b", new Instant(2))))
+            .apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2))))
+            .apply("singleton", View.<String>asSingleton());
+
+    TupleTag<KV<String, Integer>> main = new TupleTag<>();
+    TupleTag<Integer> numProcessCalls = new TupleTag<>();
+    PCollectionTuple res =
+        mainInput.apply(
+            ParDo.of(new SDFWithMultipleOutputsPerBlockAndSideInput(sideInput, numProcessCalls))
+                .withSideInputs(sideInput)
+                .withOutputTags(main, TupleTagList.of(numProcessCalls)));
+    PCollection<KV<String, Iterable<Integer>>> grouped =
+        res.get(main).apply(GroupByKey.<String, Integer>create());
+
+    PAssert.that(grouped.apply(Keys.<String>create()))
+        .containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
+    PAssert.that(grouped)
+        .satisfies(
+            new SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void>() {
+              @Override
+              public Void apply(Iterable<KV<String, Iterable<Integer>>> input) {
+                List<Integer> expected = new ArrayList<>();
+                for (int i = 0; i < SDFWithMultipleOutputsPerBlockAndSideInput.MAX_INDEX; ++i) {
+                  expected.add(i);
+                }
+                for (KV<String, Iterable<Integer>> kv : input) {
+                  assertEquals(expected, Ordering.<Integer>natural().sortedCopy(kv.getValue()));
+                }
+                return null;
+              }
+            });
+
+    // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
+    PAssert.thatSingleton(
+            res.get(numProcessCalls)
+                .setCoder(VarIntCoder.of())
+                .apply(Sum.integersGlobally().withoutDefaults()))
+        // This should hold in all windows, but verifying a particular window is sufficient.
+        .inOnlyPane(new IntervalWindow(new Instant(0), new Instant(1)))
+        .satisfies(
+            new SerializableFunction<Integer, Void>() {
+              @Override
+              public Void apply(Integer input) {
+                assertThat(input, greaterThan(1));
+                return null;
+              }
+            });
+    p.run();
 
-    // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
-    // properly access side inputs.
     // TODO: also test coverage when some of the windows of the side input are not ready.
   }
 


[2/2] beam git commit: This closes #3335: Tests for reading windowed side input from resumed SDF call

Posted by jk...@apache.org.
This closes #3335: Tests for reading windowed side input from resumed SDF call


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e827642e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e827642e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e827642e

Branch: refs/heads/master
Commit: e827642ef17259a60d1392827b750a798d36f69e
Parents: aa555f5 9a6a277
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jun 16 13:10:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jun 16 13:10:15 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/SplittableDoFnTest.java | 145 ++++++++++++++++++-
 1 file changed, 140 insertions(+), 5 deletions(-)
----------------------------------------------------------------------