You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/16 20:24:10 UTC
[1/2] beam git commit: Tests for reading windowed side input from
resumed SDF call
Repository: beam
Updated Branches:
refs/heads/master aa555f593 -> e827642ef
Tests for reading windowed side input from resumed SDF call
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9a6a277c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9a6a277c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9a6a277c
Branch: refs/heads/master
Commit: 9a6a277cea4582f0a64eac97730cb85af5ba352b
Parents: aa555f5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jun 8 16:54:12 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jun 16 13:09:50 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/SplittableDoFnTest.java | 145 ++++++++++++++++++-
1 file changed, 140 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9a6a277c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 02a44d2..646d8d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,10 +18,13 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Ordering;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,6 +32,7 @@ import java.util.List;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
@@ -182,6 +186,12 @@ public class SplittableDoFnTest implements Serializable {
private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> {
private static final int MAX_INDEX = 98765;
+ private final TupleTag<Integer> numProcessCalls;
+
+ private SDFWithMultipleOutputsPerBlock(TupleTag<Integer> numProcessCalls) {
+ this.numProcessCalls = numProcessCalls;
+ }
+
private static int snapToNextBlock(int index, int[] blockStarts) {
for (int i = 1; i < blockStarts.length; ++i) {
if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
@@ -195,6 +205,7 @@ public class SplittableDoFnTest implements Serializable {
public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
+ c.output(numProcessCalls, 1);
for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
c.output(index);
@@ -211,10 +222,26 @@ public class SplittableDoFnTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesSplittableParDo.class})
public void testOutputAfterCheckpoint() throws Exception {
- PCollection<Integer> outputs = p.apply(Create.of("foo"))
- .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
- PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
+ TupleTag<Integer> main = new TupleTag<>();
+ TupleTag<Integer> numProcessCalls = new TupleTag<>();
+ PCollectionTuple outputs =
+ p.apply(Create.of("foo"))
+ .apply(
+ ParDo.of(new SDFWithMultipleOutputsPerBlock(numProcessCalls))
+ .withOutputTags(main, TupleTagList.of(numProcessCalls)));
+ PAssert.thatSingleton(outputs.get(main).apply(Count.<Integer>globally()))
.isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX);
+ // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
+ PAssert.thatSingleton(
+ outputs.get(numProcessCalls).setCoder(VarIntCoder.of()).apply(Sum.integersGlobally()))
+ .satisfies(
+ new SerializableFunction<Integer, Void>() {
+ @Override
+ public Void apply(Integer input) {
+ assertThat(input, greaterThan(1));
+ return null;
+ }
+ });
p.run();
}
@@ -287,9 +314,117 @@ public class SplittableDoFnTest implements Serializable {
PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
p.run();
+ }
+
+ @BoundedPerElement
+ private static class SDFWithMultipleOutputsPerBlockAndSideInput
+ extends DoFn<Integer, KV<String, Integer>> {
+ private static final int MAX_INDEX = 98765;
+ private final PCollectionView<String> sideInput;
+ private final TupleTag<Integer> numProcessCalls;
+
+ public SDFWithMultipleOutputsPerBlockAndSideInput(
+ PCollectionView<String> sideInput, TupleTag<Integer> numProcessCalls) {
+ this.sideInput = sideInput;
+ this.numProcessCalls = numProcessCalls;
+ }
+
+ private static int snapToNextBlock(int index, int[] blockStarts) {
+ for (int i = 1; i < blockStarts.length; ++i) {
+ if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
+ return i;
+ }
+ }
+ throw new IllegalStateException("Shouldn't get here");
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+ int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
+ int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
+ c.output(numProcessCalls, 1);
+ for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
+ for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
+ c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
+ }
+ }
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRange(Integer element) {
+ return new OffsetRange(0, MAX_INDEX);
+ }
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesSplittableParDo.class,
+ UsesSplittableParDoWithWindowedSideInputs.class
+ })
+ public void testWindowedSideInputWithCheckpoints() throws Exception {
+ PCollection<Integer> mainInput =
+ p.apply("main",
+ Create.timestamped(
+ TimestampedValue.of(0, new Instant(0)),
+ TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(2, new Instant(2)),
+ TimestampedValue.of(3, new Instant(3))))
+ .apply("window 1", Window.<Integer>into(FixedWindows.of(Duration.millis(1))));
+
+ PCollectionView<String> sideInput =
+ p.apply("side",
+ Create.timestamped(
+ TimestampedValue.of("a", new Instant(0)),
+ TimestampedValue.of("b", new Instant(2))))
+ .apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2))))
+ .apply("singleton", View.<String>asSingleton());
+
+ TupleTag<KV<String, Integer>> main = new TupleTag<>();
+ TupleTag<Integer> numProcessCalls = new TupleTag<>();
+ PCollectionTuple res =
+ mainInput.apply(
+ ParDo.of(new SDFWithMultipleOutputsPerBlockAndSideInput(sideInput, numProcessCalls))
+ .withSideInputs(sideInput)
+ .withOutputTags(main, TupleTagList.of(numProcessCalls)));
+ PCollection<KV<String, Iterable<Integer>>> grouped =
+ res.get(main).apply(GroupByKey.<String, Integer>create());
+
+ PAssert.that(grouped.apply(Keys.<String>create()))
+ .containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
+ PAssert.that(grouped)
+ .satisfies(
+ new SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Iterable<Integer>>> input) {
+ List<Integer> expected = new ArrayList<>();
+ for (int i = 0; i < SDFWithMultipleOutputsPerBlockAndSideInput.MAX_INDEX; ++i) {
+ expected.add(i);
+ }
+ for (KV<String, Iterable<Integer>> kv : input) {
+ assertEquals(expected, Ordering.<Integer>natural().sortedCopy(kv.getValue()));
+ }
+ return null;
+ }
+ });
+
+ // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
+ PAssert.thatSingleton(
+ res.get(numProcessCalls)
+ .setCoder(VarIntCoder.of())
+ .apply(Sum.integersGlobally().withoutDefaults()))
+ // This should hold in all windows, but verifying a particular window is sufficient.
+ .inOnlyPane(new IntervalWindow(new Instant(0), new Instant(1)))
+ .satisfies(
+ new SerializableFunction<Integer, Void>() {
+ @Override
+ public Void apply(Integer input) {
+ assertThat(input, greaterThan(1));
+ return null;
+ }
+ });
+ p.run();
- // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
- // properly access side inputs.
// TODO: also test coverage when some of the windows of the side input are not ready.
}
[2/2] beam git commit: This closes #3335: Tests for reading windowed
side input from resumed SDF call
Posted by jk...@apache.org.
This closes #3335: Tests for reading windowed side input from resumed SDF call
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e827642e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e827642e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e827642e
Branch: refs/heads/master
Commit: e827642ef17259a60d1392827b750a798d36f69e
Parents: aa555f5 9a6a277
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jun 16 13:10:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jun 16 13:10:15 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/SplittableDoFnTest.java | 145 ++++++++++++++++++-
1 file changed, 140 insertions(+), 5 deletions(-)
----------------------------------------------------------------------