You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:41 UTC

[07/50] [abbrv] beam git commit: Explodes windows before GBKIKWI

Explodes windows before GBKIKWI

Also
* Adds a test for windowed side inputs that requires this
  behavior.
* Adds a test category for SDF with windowed side input.
  Runners should gradually implement this properly. For now
  only direct runner implements this properly.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ac3ac50
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ac3ac50
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ac3ac50

Branch: refs/heads/DSL_SQL
Commit: 6ac3ac50fec2eb02927c0a07ca928967cfef5652
Parents: b93de58
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 11:28:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDo.java      | 75 +++++++++---------
 .../beam/runners/core/SplittableParDoTest.java  | 82 +++++++-------------
 runners/flink/runner/pom.xml                    |  3 +-
 ...esSplittableParDoWithWindowedSideInputs.java | 26 +++++++
 .../beam/sdk/transforms/SplittableDoFnTest.java | 41 ++++++++++
 5 files changed, 137 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 44db1f7..31d89ee 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -19,10 +19,8 @@ package org.apache.beam.runners.core;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.UUID;
@@ -138,6 +136,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             .setCoder(splitCoder)
             .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
             .setCoder(splitCoder)
+            // ProcessFn requires all input elements to be in a single window and have a single
+            // element per work item. This must precede the unique keying so each key has a single
+            // associated element.
+            .apply(
+                "Explode windows",
+                ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, RestrictionT>>()))
             .apply(
                 "Assign unique key",
                 WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
@@ -158,6 +162,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   }
 
   /**
+   * A {@link DoFn} that forces each of its outputs to be in a single window, by indicating to the
+   * runner that it observes the window of its input element, so the runner is forced to apply it to
+   * each input in a single window and thus its output is also in a single window.
+   */
+  private static class ExplodeWindowsFn<InputT> extends DoFn<InputT, InputT> {
+    @ProcessElement
+    public void process(ProcessContext c, BoundedWindow window) {
+      c.output(c.element());
+    }
+  }
+
+  /**
    * Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces
    * {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers.
    *
@@ -317,6 +333,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
    * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
    * by creating a tracker for the restriction and checkpointing/resuming processing later if
    * necessary.
+   *
+   * <p>Takes {@link KeyedWorkItem} and assumes that the KeyedWorkItem contains a single element
+   * (or a single timer set by {@link ProcessFn itself}, in a single window. This is necessary
+   * because {@link ProcessFn} sets timers, and timers are namespaced to a single window and it
+   * should be the window of the input element.
+   *
+   * <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
    */
   @VisibleForTesting
   public static class ProcessFn<
@@ -441,7 +464,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       // Subsequent calls are timer firings and the element has to be retrieved from the state.
       TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
       boolean isSeedCall = (timer == null);
-      StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
+      StateNamespace stateNamespace;
+      if (isSeedCall) {
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+            Iterables.getOnlyElement(c.element().elementsIterable());
+        BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
+        stateNamespace =
+            StateNamespaces.window(
+                (Coder<BoundedWindow>) inputWindowingStrategy.getWindowFn().windowCoder(), window);
+      } else {
+        stateNamespace = timer.getNamespace();
+      }
+
       ValueState<WindowedValue<InputT>> elementState =
           stateInternals.state(stateNamespace, elementTag);
       ValueState<RestrictionT> restrictionState =
@@ -451,15 +485,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
       ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
       if (isSeedCall) {
-        // The element and restriction are available in c.element().
-        // elementsIterable() will, by construction of SplittableParDo, contain the same value
-        // potentially in several different windows. We implode this into a single WindowedValue
-        // in order to simplify the rest of the code and avoid iterating over elementsIterable()
-        // explicitly. The windows of this WindowedValue will be propagated to windows of the
-        // output. This is correct because a splittable DoFn is not allowed to inspect the window
-        // of its element.
         WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
-            implodeWindows(c.element().elementsIterable());
+            Iterables.getOnlyElement(c.element().elementsIterable());
         WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
         elementState.write(element);
         elementAndRestriction =
@@ -498,32 +525,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
               stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
     }
 
-    /**
-     * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link
-     * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies
-     * of the same value with the same timestamp, but different window sets.
-     *
-     * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique
-     * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s
-     * for the same key, that means only that the windows of that {@link ElementAndRestriction} are
-     * being delivered separately rather than all at once. It is also legal to do because splittable
-     * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full
-     * set of windows of its input to its output.
-     */
-    private static <InputT, RestrictionT>
-        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows(
-            Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) {
-      WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first =
-          Iterables.getFirst(values, null);
-      checkState(first != null, "Got a KeyedWorkItem with no elements and no timers");
-      ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder();
-      for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) {
-        windows.addAll(value.getWindows());
-      }
-      return WindowedValue.of(
-          first.getValue(), first.getTimestamp(), windows.build(), first.getPane());
-    }
-
     private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) {
       return fn.new Context() {
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 5629635..1a44453 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Executors;
@@ -194,11 +195,6 @@ public class SplittableParDoTest {
 
   // ------------------------------- Tests for ProcessFn ---------------------------------
 
-  enum WindowExplosion {
-    EXPLODE_WINDOWS,
-    DO_NOT_EXPLODE_WINDOWS
-  }
-
   /**
    * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
    * {@link DoFn.ProcessElement} calls).
@@ -293,24 +289,13 @@ public class SplittableParDoTest {
               ElementAndRestriction.of(element, restriction),
               currentProcessingTime,
               GlobalWindow.INSTANCE,
-              PaneInfo.ON_TIME_AND_ONLY_FIRING),
-          WindowExplosion.DO_NOT_EXPLODE_WINDOWS);
+              PaneInfo.ON_TIME_AND_ONLY_FIRING));
     }
 
-    void startElement(
-        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue,
-        WindowExplosion explosion)
+    void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
         throws Exception {
-      switch (explosion) {
-        case EXPLODE_WINDOWS:
-          tester.processElement(
-              KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows()));
-          break;
-        case DO_NOT_EXPLODE_WINDOWS:
-          tester.processElement(
-              KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
-          break;
-      }
+      tester.processElement(
+          KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue)));
     }
 
     /**
@@ -394,46 +379,39 @@ public class SplittableParDoTest {
   }
 
   @Test
-  public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception {
-    // Tests that ProcessFn correctly propagates windows and timestamp of the element
+  public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception {
+    // Tests that ProcessFn correctly propagates the window and timestamp of the element
     // inside the KeyedWorkItem.
     // The underlying DoFn is actually monolithic, so this doesn't test splitting.
     DoFn<Integer, String> fn = new ToStringFn();
 
     Instant base = Instant.now();
 
-    IntervalWindow w1 =
+    IntervalWindow w =
         new IntervalWindow(
             base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));
-    IntervalWindow w2 =
-        new IntervalWindow(
-            base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2)));
-    IntervalWindow w3 =
-        new IntervalWindow(
-            base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
-
-    for (WindowExplosion explosion : WindowExplosion.values()) {
-      ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
-          new ProcessFnTester<>(
-              base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class),
-              MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
-      tester.startElement(
-          WindowedValue.of(
-              ElementAndRestriction.of(42, new SomeRestriction()),
-              base,
-              Arrays.asList(w1, w2, w3),
-              PaneInfo.ON_TIME_AND_ONLY_FIRING),
-          explosion);
-
-      for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
-        assertEquals(
-            Arrays.asList(
-                TimestampedValue.of("42a", base),
-                TimestampedValue.of("42b", base),
-                TimestampedValue.of("42c", base)),
-            tester.peekOutputElementsInWindow(w));
-      }
-    }
+
+    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+        new ProcessFnTester<>(
+            base,
+            fn,
+            BigEndianIntegerCoder.of(),
+            SerializableCoder.of(SomeRestriction.class),
+            MAX_OUTPUTS_PER_BUNDLE,
+            MAX_BUNDLE_DURATION);
+    tester.startElement(
+        WindowedValue.of(
+            ElementAndRestriction.of(42, new SomeRestriction()),
+            base,
+            Collections.singletonList(w),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    assertEquals(
+        Arrays.asList(
+            TimestampedValue.of("42a", base),
+            TimestampedValue.of("42b", base),
+            TimestampedValue.of("42c", base)),
+        tester.peekOutputElementsInWindow(w));
   }
 
   private static class WatermarkUpdateFn extends DoFn<Instant, String> {

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 95880f4..1e6452d 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -91,7 +91,8 @@
                     org.apache.beam.sdk.testing.UsesMapState,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
-                    org.apache.beam.sdk.testing.UsesTestStream
+                    org.apache.beam.sdk.testing.UsesTestStream,
+                    org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
new file mode 100644
index 0000000..2b1d673
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize splittable {@link ParDo} and use
+ * windowed side inputs.
+ */
+public interface UsesSplittableParDoWithWindowedSideInputs {}

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 30329f4..a0f1fd3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
@@ -252,6 +253,46 @@ public class SplittableDoFnTest implements Serializable {
     p.run();
   }
 
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    UsesSplittableParDo.class,
+    UsesSplittableParDoWithWindowedSideInputs.class
+  })
+  public void testWindowedSideInput() throws Exception {
+    PCollection<Integer> mainInput =
+        p.apply("main",
+                Create.timestamped(
+                    TimestampedValue.of(0, new Instant(0)),
+                    TimestampedValue.of(1, new Instant(1)),
+                    TimestampedValue.of(2, new Instant(2)),
+                    TimestampedValue.of(3, new Instant(3)),
+                    TimestampedValue.of(4, new Instant(4)),
+                    TimestampedValue.of(5, new Instant(5)),
+                    TimestampedValue.of(6, new Instant(6)),
+                    TimestampedValue.of(7, new Instant(7))))
+            .apply("window 2", Window.<Integer>into(FixedWindows.of(Duration.millis(2))));
+
+    PCollectionView<String> sideInput =
+        p.apply("side",
+                Create.timestamped(
+                    TimestampedValue.of("a", new Instant(0)),
+                    TimestampedValue.of("b", new Instant(4))))
+            .apply("window 4", Window.<String>into(FixedWindows.of(Duration.millis(4))))
+            .apply("singleton", View.<String>asSingleton());
+
+    PCollection<String> res =
+        mainInput.apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput));
+
+    PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
+
+    p.run();
+
+    // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
+    // properly access side inputs.
+    // TODO: also test coverage when some of the windows of the side input are not ready.
+  }
+
   private static class SDFWithAdditionalOutput extends DoFn<Integer, String> {
     private final TupleTag<String> additionalOutput;