You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/17 21:46:47 UTC
[16/50] [abbrv] beam git commit: Move GC timer checking to
StatefulDoFnRunner.CleanupTimer
Move GC timer checking to StatefulDoFnRunner.CleanupTimer
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf6d2748
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf6d2748
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf6d2748
Branch: refs/heads/gearpump-runner
Commit: bf6d2748c8876a7415290069163625598928f02f
Parents: 2c2424c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 08:29:27 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 10 11:09:04 2017 +0100
----------------------------------------------------------------------
.../beam/runners/core/StatefulDoFnRunner.java | 29 ++++++++++++++++----
1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bf6d2748/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 154d8bc..926345e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -115,15 +115,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
- boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
- Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
- if (isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp)) {
+ if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
- if (isEventTimer || !dropLateData(window)) {
- doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
- }
+ // a timer can never be late because we don't allow setting timers after GC time
+ doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
}
}
@@ -151,6 +148,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
* Set the garbage collect time of the window to timer.
*/
void setForWindow(BoundedWindow window);
+
+ /**
+ * Checks whether the given timer is a cleanup timer for the window.
+ */
+ boolean isForWindow(
+ String timerId,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain);
+
}
/**
@@ -191,6 +198,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
}
+ @Override
+ public boolean isForWindow(
+ String timerId,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain) {
+ boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+ Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+ return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+ }
}
/**