You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/11 01:47:21 UTC

[3/9] beam git commit: Properly deal with late processing-time timers

Properly deal with late processing-time timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86522157
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86522157
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86522157

Branch: refs/heads/release-0.6.0
Commit: 86522157a79fd9a753436312ff8b746cb5740135
Parents: 8fa718d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 15:25:26 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:13:57 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/StatefulDoFnRunner.java   | 40 ++++++++++++--------
 1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/86522157/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index c672902..d27193c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -76,33 +76,31 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
   }
 
   @Override
-  public void processElement(WindowedValue<InputT> compressedElem) {
+  public void processElement(WindowedValue<InputT> input) {
 
     // StatefulDoFnRunner always observes windows, so we need to explode
-    for (WindowedValue<InputT> value : compressedElem.explodeWindows()) {
+    for (WindowedValue<InputT> value : input.explodeWindows()) {
 
       BoundedWindow window = value.getWindows().iterator().next();
 
-      if (!dropLateData(window)) {
+      if (isLate(window)) {
+        // The element is too late for this window.
+        droppedDueToLateness.addValue(1L);
+        WindowTracing.debug(
+            "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+                + "since too far behind inputWatermark:{}",
+            input.getTimestamp(), window, cleanupTimer.currentInputWatermarkTime());
+      } else {
         cleanupTimer.setForWindow(window);
         doFnRunner.processElement(value);
       }
     }
   }
 
-  private boolean dropLateData(BoundedWindow window) {
+  private boolean isLate(BoundedWindow window) {
     Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     Instant inputWM = cleanupTimer.currentInputWatermarkTime();
-    if (gcTime.isBefore(inputWM)) {
-      // The element is too late for this window.
-      droppedDueToLateness.addValue(1L);
-      WindowTracing.debug(
-          "StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} "
-              + "since too far behind inputWatermark:{}", window, inputWM);
-      return true;
-    } else {
-      return false;
-    }
+    return gcTime.isBefore(inputWM);
   }
 
   @Override
@@ -112,8 +110,18 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
       stateCleaner.clearForWindow(window);
       // There should invoke the onWindowExpiration of DoFn
     } else {
-      // a timer can never be late because we don't allow setting timers after GC time
-      doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      // An event-time timer can never be late because we don't allow setting timers after GC time.
+      // Ot can happen that a processing-time time fires for a late window, we need to ignore
+      // this.
+      if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {
+        // don't increment the dropped counter, only do that for elements
+        WindowTracing.debug(
+            "StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} "
+                + "since window is too far behind inputWatermark:{}",
+            timestamp, window, cleanupTimer.currentInputWatermarkTime());
+      } else {
+        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      }
     }
   }