You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/10 04:50:01 UTC

[31/43] beam git commit: Process timer firings for a window together

Process timer firings for a window together


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/935c0773
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/935c0773
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/935c0773

Branch: refs/heads/gearpump-runner
Commit: 935c077341de580dddd4b29ffee3926795acf403
Parents: bd631b8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 18:43:39 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 14:12:39 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/LeaderBoardTest.java |  2 +
 .../beam/runners/core/ReduceFnRunner.java       | 98 +++++++++++++-------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 49 +++++++++-
 3 files changed, 115 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 745c210..611e2b3 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -276,6 +276,8 @@ public class LeaderBoardTest implements Serializable {
         .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
             event(TestUser.BLUE_TWO, 3, Duration.ZERO),
             event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+        // Move the watermark to the end of the window to output on time
+        .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION))
         // Move the watermark past the end of the allowed lateness plus the end of the window
         .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
             .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))

http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 0632c05..634a2d1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -29,7 +29,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -638,11 +637,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
   }
 
   /**
-   * Enriches TimerData with state necessary for processing a timer as well as
-   * common queries about a timer.
+   * A descriptor of the activation for a window based on a timer.
    */
-  private class EnrichedTimerData {
-    public final Instant timestamp;
+  private class WindowActivation {
     public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
     public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
     // If this is an end-of-window timer then we may need to set a garbage collection timer
@@ -653,19 +650,34 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     // end-of-window time to be a signal to garbage collect.
     public final boolean isGarbageCollection;
 
-    EnrichedTimerData(
-        TimerData timer,
+    WindowActivation(
         ReduceFn<K, InputT, OutputT, W>.Context directContext,
         ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
-      this.timestamp = timer.getTimestamp();
       this.directContext = directContext;
       this.renamedContext = renamedContext;
       W window = directContext.window();
-      this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-          && timer.getTimestamp().equals(window.maxTimestamp());
-      Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
+
+      // The output watermark is before the end of the window if it is either unknown
+      // or it is known to be before it. If it is unknown, that means that there hasn't been
+      // enough data to advance it.
+      boolean outputWatermarkBeforeEOW =
+              timerInternals.currentOutputWatermarkTime() == null
+          || !timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp());
+
+      // The "end of the window" is reached when the local input watermark (for this key) surpasses
+      // it but the local output watermark (also for this key) has not. After data is emitted and
+      // the output watermark hold is released, the output watermark on this key will immediately
+      // exceed the end of the window (otherwise we could see multiple ON_TIME outputs)
+      this.isEndOfWindow =
+          timerInternals.currentInputWatermarkTime().isAfter(window.maxTimestamp())
+              && outputWatermarkBeforeEOW;
+
+      // The "GC time" is reached when the input watermark surpasses the end of the window
+      // plus allowed lateness. After this, the window is expired and expunged.
       this.isGarbageCollection =
-          TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore(cleanupTime);
+          timerInternals
+              .currentInputWatermarkTime()
+              .isAfter(LateDataUtils.garbageCollectionTime(window, windowingStrategy));
     }
 
     // Has this window had its trigger finish?
@@ -684,9 +696,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       return;
     }
 
-    // Create a reusable context for each timer and begin prefetching necessary
+    // Create a reusable context for each window and begin prefetching necessary
     // state.
-    List<EnrichedTimerData> enrichedTimers = new LinkedList();
+    Map<BoundedWindow, WindowActivation> windowActivations = new HashMap();
+
     for (TimerData timer : timers) {
       checkArgument(timer.getNamespace() instanceof WindowNamespace,
           "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
@@ -694,7 +707,24 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
       W window = windowNamespace.getWindow();
 
-      if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) {
+      WindowTracing.debug("{}: Received timer key:{}; window:{}; data:{} with "
+              + "inputWatermark:{}; outputWatermark:{}",
+          ReduceFnRunner.class.getSimpleName(),
+          key, window, timer,
+          timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+
+      // Processing time timers for an expired window are ignored, just like elements
+      // that show up too late. Window GC is management by an event time timer
+      if (TimeDomain.EVENT_TIME != timer.getDomain() && windowIsExpired(window)) {
+        continue;
+      }
+
+      // How a window is processed is a function only of the current state, not the details
+      // of the timer. This makes us robust to large leaps in processing time and watermark
+      // time, where both EOW and GC timers come in together and we need to GC and emit
+      // the final pane.
+      if (windowActivations.containsKey(window)) {
         continue;
       }
 
@@ -702,11 +732,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
           contextFactory.base(window, StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
           contextFactory.base(window, StateStyle.RENAMED);
-      EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
-      enrichedTimers.add(enrichedTimer);
+      WindowActivation windowActivation = new WindowActivation(directContext, renamedContext);
+      windowActivations.put(window, windowActivation);
 
       // Perform prefetching of state to determine if the trigger should fire.
-      if (enrichedTimer.isGarbageCollection) {
+      if (windowActivation.isGarbageCollection) {
         triggerRunner.prefetchIsClosed(directContext.state());
       } else {
         triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
@@ -714,7 +744,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     // For those windows that are active and open, prefetch the triggering or emitting state.
-    for (EnrichedTimerData timer : enrichedTimers) {
+    for (WindowActivation timer : windowActivations.values()) {
       if (timer.windowIsActiveAndOpen()) {
         ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
         if (timer.isGarbageCollection) {
@@ -727,25 +757,27 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     // Perform processing now that everything is prefetched.
-    for (EnrichedTimerData timer : enrichedTimers) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
+    for (WindowActivation windowActivation : windowActivations.values()) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = windowActivation.directContext;
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = windowActivation.renamedContext;
 
-      if (timer.isGarbageCollection) {
-        WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
-                + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), timer.timestamp,
+      if (windowActivation.isGarbageCollection) {
+        WindowTracing.debug(
+            "{}: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}",
+            ReduceFnRunner.class.getSimpleName(),
+            key,
+            directContext.window(),
             timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
 
-        boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
+        boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen();
         if (windowIsActiveAndOpen) {
           // We need to call onTrigger to emit the final pane if required.
           // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
           // and the watermark has passed the end of the window.
           @Nullable
           Instant newHold = onTrigger(
-              directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
+              directContext, renamedContext, true /* isFinished */, windowActivation.isEndOfWindow);
           checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
         }
 
@@ -753,18 +785,20 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         // see elements for it again.
         clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
       } else {
-        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+        WindowTracing.debug(
+            "{}.onTimers: Triggering for key:{}; window:{} at {} with "
                 + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), timer.timestamp,
+            key,
+            directContext.window(),
             timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
-        if (timer.windowIsActiveAndOpen()
+        if (windowActivation.windowIsActiveAndOpen()
             && triggerRunner.shouldFire(
                    directContext.window(), directContext.timers(), directContext.state())) {
           emit(directContext, renamedContext);
         }
 
-        if (timer.isEndOfWindow) {
+        if (windowActivation.isEndOfWindow) {
           // If the window strategy trigger includes a watermark trigger then at this point
           // there should be no data holds, either because we'd already cleared them on an
           // earlier onTrigger, or because we just cleared them on the above emit.

http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 79ee91b..4f13af1 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -79,7 +80,6 @@ import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -246,6 +246,52 @@ public class ReduceFnRunnerTest {
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
   }
 
+  /**
+   * Tests that with the default trigger we will not produce two ON_TIME panes, even
+   * if there are two outputs that are both candidates.
+   */
+  @Test
+  public void testOnlyOneOnTimePane() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withTrigger(DefaultTrigger.of())
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceInputWatermark(new Instant(0));
+
+    int value1 = 1;
+    int value2 = 3;
+
+    // A single element that should be in the ON_TIME output
+    tester.injectElements(
+        TimestampedValue.of(value1, new Instant(1)));
+
+    // Should fire ON_TIME
+    tester.advanceInputWatermark(new Instant(10));
+
+    // The DefaultTrigger should cause output labeled LATE, even though it does not have to be
+    // labeled as such.
+    tester.injectElements(
+        TimestampedValue.of(value2, new Instant(3)));
+
+    List<WindowedValue<Integer>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+
+    assertThat(output.get(0), WindowMatchers.isWindowedValue(equalTo(value1)));
+    assertThat(output.get(1), WindowMatchers.isWindowedValue(equalTo(value1 + value2)));
+
+    assertThat(
+        output.get(0),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+    assertThat(
+        output.get(1),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 1)));
+  }
+
   @Test
   public void testOnElementCombiningDiscarding() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and discarding mode.
@@ -458,7 +504,6 @@ public class ReduceFnRunnerTest {
    * marked as final.
    */
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-2505")
   public void testCombiningAccumulatingEventTime() throws Exception {
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))