You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/10 21:02:21 UTC
[3/4] beam git commit: Properly deal with late processing-time timers
Properly deal with late processing-time timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dbfcf4b4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dbfcf4b4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dbfcf4b4
Branch: refs/heads/master
Commit: dbfcf4b4a63b38653adc21d1cf37d6c4cfd955ad
Parents: 1a8e1f7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 15:25:26 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 10 15:25:26 2017 +0100
----------------------------------------------------------------------
.../beam/runners/core/StatefulDoFnRunner.java | 40 ++++++++++++--------
1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dbfcf4b4/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index c672902..d27193c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -76,33 +76,31 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
}
@Override
- public void processElement(WindowedValue<InputT> compressedElem) {
+ public void processElement(WindowedValue<InputT> input) {
// StatefulDoFnRunner always observes windows, so we need to explode
- for (WindowedValue<InputT> value : compressedElem.explodeWindows()) {
+ for (WindowedValue<InputT> value : input.explodeWindows()) {
BoundedWindow window = value.getWindows().iterator().next();
- if (!dropLateData(window)) {
+ if (isLate(window)) {
+ // The element is too late for this window.
+ droppedDueToLateness.addValue(1L);
+ WindowTracing.debug(
+ "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+ + "since too far behind inputWatermark:{}",
+ input.getTimestamp(), window, cleanupTimer.currentInputWatermarkTime());
+ } else {
cleanupTimer.setForWindow(window);
doFnRunner.processElement(value);
}
}
}
- private boolean dropLateData(BoundedWindow window) {
+ private boolean isLate(BoundedWindow window) {
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant inputWM = cleanupTimer.currentInputWatermarkTime();
- if (gcTime.isBefore(inputWM)) {
- // The element is too late for this window.
- droppedDueToLateness.addValue(1L);
- WindowTracing.debug(
- "StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} "
- + "since too far behind inputWatermark:{}", window, inputWM);
- return true;
- } else {
- return false;
- }
+ return gcTime.isBefore(inputWM);
}
@Override
@@ -112,8 +110,18 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
- // a timer can never be late because we don't allow setting timers after GC time
- doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ // An event-time timer can never be late because we don't allow setting timers after GC time.
+ // Ot can happen that a processing-time time fires for a late window, we need to ignore
+ // this.
+ if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {
+ // don't increment the dropped counter, only do that for elements
+ WindowTracing.debug(
+ "StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} "
+ + "since window is too far behind inputWatermark:{}",
+ timestamp, window, cleanupTimer.currentInputWatermarkTime());
+ } else {
+ doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ }
}
}