You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:36 UTC

[24/50] [abbrv] beam git commit: Ignore processing time timers in expired windows

Ignore processing time timers in expired windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/951f3cab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/951f3cab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/951f3cab

Branch: refs/heads/DSL_SQL
Commit: 951f3cab3f6558524ee1146e0e3f347bcd02ecda
Parents: c167d10
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 18:09:11 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:00 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunner.java       | 10 ++++++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 32 ++++++++++++++++++++
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index ef33bef..0632c05 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -693,6 +693,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       @SuppressWarnings("unchecked")
         WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
       W window = windowNamespace.getWindow();
+
+      if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) {
+        continue;
+      }
+
       ReduceFn<K, InputT, OutputT, W>.Context directContext =
           contextFactory.base(window, StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
@@ -1090,4 +1095,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
   }
 
+  private boolean windowIsExpired(BoundedWindow w) {
+    return timerInternals
+        .currentInputWatermarkTime()
+        .isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3a2c220..79ee91b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -286,6 +286,38 @@ public class ReduceFnRunnerTest {
 
   /**
    * Tests that when a processing time timer comes in after a window is expired
+   * it is just ignored.
+   */
+  @Test
+  public void testLateProcessingTimeTimer() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    // After this advancement, the window is expired and only the GC process
+    // should be allowed to touch it
+    tester.advanceInputWatermarkNoTimers(new Instant(100));
+
+    // This should not output
+    tester.advanceProcessingTime(new Instant(6000));
+
+    assertThat(tester.extractOutput(), emptyIterable());
+  }
+
+  /**
+   * Tests that when a processing time timer comes in after a window is expired
    * but in the same bundle it does not cause a spurious output.
    */
   @Test