You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:41 UTC
[07/50] [abbrv] beam git commit: Explodes windows before GBKIKWI
Explodes windows before GBKIKWI
Also
* Adds a test for windowed side inputs that requires this
behavior.
* Adds a test category for SDF with windowed side input.
Runners should gradually implement this properly. For now
only direct runner implements this properly.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ac3ac50
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ac3ac50
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ac3ac50
Branch: refs/heads/DSL_SQL
Commit: 6ac3ac50fec2eb02927c0a07ca928967cfef5652
Parents: b93de58
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 11:28:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SplittableParDo.java | 75 +++++++++---------
.../beam/runners/core/SplittableParDoTest.java | 82 +++++++-------------
runners/flink/runner/pom.xml | 3 +-
...esSplittableParDoWithWindowedSideInputs.java | 26 +++++++
.../beam/sdk/transforms/SplittableDoFnTest.java | 41 ++++++++++
5 files changed, 137 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 44db1f7..31d89ee 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -19,10 +19,8 @@ package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.List;
import java.util.UUID;
@@ -138,6 +136,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
.setCoder(splitCoder)
.apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
.setCoder(splitCoder)
+ // ProcessFn requires all input elements to be in a single window and have a single
+ // element per work item. This must precede the unique keying so each key has a single
+ // associated element.
+ .apply(
+ "Explode windows",
+ ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, RestrictionT>>()))
.apply(
"Assign unique key",
WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
@@ -158,6 +162,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
/**
+ * A {@link DoFn} that forces each of its outputs to be in a single window, by indicating to the
+ * runner that it observes the window of its input element, so the runner is forced to apply it to
+ * each input in a single window and thus its output is also in a single window.
+ */
+ private static class ExplodeWindowsFn<InputT> extends DoFn<InputT, InputT> {
+ @ProcessElement
+ public void process(ProcessContext c, BoundedWindow window) {
+ c.output(c.element());
+ }
+ }
+
+ /**
* Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces
* {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers.
*
@@ -317,6 +333,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
* by creating a tracker for the restriction and checkpointing/resuming processing later if
* necessary.
+ *
+ * <p>Takes {@link KeyedWorkItem} and assumes that the KeyedWorkItem contains a single element
+ * (or a single timer set by {@link ProcessFn itself}, in a single window. This is necessary
+ * because {@link ProcessFn} sets timers, and timers are namespaced to a single window and it
+ * should be the window of the input element.
+ *
+ * <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
*/
@VisibleForTesting
public static class ProcessFn<
@@ -441,7 +464,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
// Subsequent calls are timer firings and the element has to be retrieved from the state.
TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
boolean isSeedCall = (timer == null);
- StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
+ StateNamespace stateNamespace;
+ if (isSeedCall) {
+ WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+ Iterables.getOnlyElement(c.element().elementsIterable());
+ BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
+ stateNamespace =
+ StateNamespaces.window(
+ (Coder<BoundedWindow>) inputWindowingStrategy.getWindowFn().windowCoder(), window);
+ } else {
+ stateNamespace = timer.getNamespace();
+ }
+
ValueState<WindowedValue<InputT>> elementState =
stateInternals.state(stateNamespace, elementTag);
ValueState<RestrictionT> restrictionState =
@@ -451,15 +485,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
if (isSeedCall) {
- // The element and restriction are available in c.element().
- // elementsIterable() will, by construction of SplittableParDo, contain the same value
- // potentially in several different windows. We implode this into a single WindowedValue
- // in order to simplify the rest of the code and avoid iterating over elementsIterable()
- // explicitly. The windows of this WindowedValue will be propagated to windows of the
- // output. This is correct because a splittable DoFn is not allowed to inspect the window
- // of its element.
WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
- implodeWindows(c.element().elementsIterable());
+ Iterables.getOnlyElement(c.element().elementsIterable());
WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
elementState.write(element);
elementAndRestriction =
@@ -498,32 +525,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
}
- /**
- * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link
- * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies
- * of the same value with the same timestamp, but different window sets.
- *
- * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique
- * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s
- * for the same key, that means only that the windows of that {@link ElementAndRestriction} are
- * being delivered separately rather than all at once. It is also legal to do because splittable
- * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full
- * set of windows of its input to its output.
- */
- private static <InputT, RestrictionT>
- WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows(
- Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) {
- WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first =
- Iterables.getFirst(values, null);
- checkState(first != null, "Got a KeyedWorkItem with no elements and no timers");
- ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder();
- for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) {
- windows.addAll(value.getWindows());
- }
- return WindowedValue.of(
- first.getValue(), first.getTimestamp(), windows.build(), first.getPane());
- }
-
private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) {
return fn.new Context() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 5629635..1a44453 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
@@ -194,11 +195,6 @@ public class SplittableParDoTest {
// ------------------------------- Tests for ProcessFn ---------------------------------
- enum WindowExplosion {
- EXPLODE_WINDOWS,
- DO_NOT_EXPLODE_WINDOWS
- }
-
/**
* A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
* {@link DoFn.ProcessElement} calls).
@@ -293,24 +289,13 @@ public class SplittableParDoTest {
ElementAndRestriction.of(element, restriction),
currentProcessingTime,
GlobalWindow.INSTANCE,
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
- WindowExplosion.DO_NOT_EXPLODE_WINDOWS);
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
- void startElement(
- WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue,
- WindowExplosion explosion)
+ void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
throws Exception {
- switch (explosion) {
- case EXPLODE_WINDOWS:
- tester.processElement(
- KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows()));
- break;
- case DO_NOT_EXPLODE_WINDOWS:
- tester.processElement(
- KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
- break;
- }
+ tester.processElement(
+ KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue)));
}
/**
@@ -394,46 +379,39 @@ public class SplittableParDoTest {
}
@Test
- public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception {
- // Tests that ProcessFn correctly propagates windows and timestamp of the element
+ public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception {
+ // Tests that ProcessFn correctly propagates the window and timestamp of the element
// inside the KeyedWorkItem.
// The underlying DoFn is actually monolithic, so this doesn't test splitting.
DoFn<Integer, String> fn = new ToStringFn();
Instant base = Instant.now();
- IntervalWindow w1 =
+ IntervalWindow w =
new IntervalWindow(
base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));
- IntervalWindow w2 =
- new IntervalWindow(
- base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2)));
- IntervalWindow w3 =
- new IntervalWindow(
- base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
-
- for (WindowExplosion explosion : WindowExplosion.values()) {
- ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
- new ProcessFnTester<>(
- base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class),
- MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
- tester.startElement(
- WindowedValue.of(
- ElementAndRestriction.of(42, new SomeRestriction()),
- base,
- Arrays.asList(w1, w2, w3),
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
- explosion);
-
- for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
- assertEquals(
- Arrays.asList(
- TimestampedValue.of("42a", base),
- TimestampedValue.of("42b", base),
- TimestampedValue.of("42c", base)),
- tester.peekOutputElementsInWindow(w));
- }
- }
+
+ ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+ new ProcessFnTester<>(
+ base,
+ fn,
+ BigEndianIntegerCoder.of(),
+ SerializableCoder.of(SomeRestriction.class),
+ MAX_OUTPUTS_PER_BUNDLE,
+ MAX_BUNDLE_DURATION);
+ tester.startElement(
+ WindowedValue.of(
+ ElementAndRestriction.of(42, new SomeRestriction()),
+ base,
+ Collections.singletonList(w),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ assertEquals(
+ Arrays.asList(
+ TimestampedValue.of("42a", base),
+ TimestampedValue.of("42b", base),
+ TimestampedValue.of("42c", base)),
+ tester.peekOutputElementsInWindow(w));
}
private static class WatermarkUpdateFn extends DoFn<Instant, String> {
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 95880f4..1e6452d 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -91,7 +91,8 @@
org.apache.beam.sdk.testing.UsesMapState,
org.apache.beam.sdk.testing.UsesAttemptedMetrics,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
- org.apache.beam.sdk.testing.UsesTestStream
+ org.apache.beam.sdk.testing.UsesTestStream,
+ org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
new file mode 100644
index 0000000..2b1d673
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize splittable {@link ParDo} and use
+ * windowed side inputs.
+ */
+public interface UsesSplittableParDoWithWindowedSideInputs {}
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 30329f4..a0f1fd3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
@@ -252,6 +253,46 @@ public class SplittableDoFnTest implements Serializable {
p.run();
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesSplittableParDo.class,
+ UsesSplittableParDoWithWindowedSideInputs.class
+ })
+ public void testWindowedSideInput() throws Exception {
+ PCollection<Integer> mainInput =
+ p.apply("main",
+ Create.timestamped(
+ TimestampedValue.of(0, new Instant(0)),
+ TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(2, new Instant(2)),
+ TimestampedValue.of(3, new Instant(3)),
+ TimestampedValue.of(4, new Instant(4)),
+ TimestampedValue.of(5, new Instant(5)),
+ TimestampedValue.of(6, new Instant(6)),
+ TimestampedValue.of(7, new Instant(7))))
+ .apply("window 2", Window.<Integer>into(FixedWindows.of(Duration.millis(2))));
+
+ PCollectionView<String> sideInput =
+ p.apply("side",
+ Create.timestamped(
+ TimestampedValue.of("a", new Instant(0)),
+ TimestampedValue.of("b", new Instant(4))))
+ .apply("window 4", Window.<String>into(FixedWindows.of(Duration.millis(4))))
+ .apply("singleton", View.<String>asSingleton());
+
+ PCollection<String> res =
+ mainInput.apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput));
+
+ PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
+
+ p.run();
+
+ // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
+ // properly access side inputs.
+ // TODO: also test coverage when some of the windows of the side input are not ready.
+ }
+
private static class SDFWithAdditionalOutput extends DoFn<Integer, String> {
private final TupleTag<String> additionalOutput;