You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/21 10:17:20 UTC
[3/6] flink git commit: [FLINK-3714] Remove Unneccesary Timer in
EventTimeTrigger
[FLINK-3714] Remove Unneccesary Timer in EventTimeTrigger
In onElement() we registered a timer for the case where the watermark is
already past the end of the window and we're firing anyways. Now, only
add a timer if the watermark is not already past the end of the window.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0104a926
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0104a926
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0104a926
Branch: refs/heads/master
Commit: 0104a9260353e9a044504b65d480c83a7d3799fc
Parents: 34a8b03
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 20 16:38:22 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 21 12:16:19 2016 +0200
----------------------------------------------------------------------
.../api/windowing/triggers/EventTimeTrigger.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0104a926/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index a87e436..96e862f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -35,11 +35,13 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
- ctx.registerEventTimeTimer(window.maxTimestamp());
-
- return (window.maxTimestamp() <= ctx.getCurrentWatermark()) ?
- TriggerResult.FIRE_AND_PURGE :
- TriggerResult.CONTINUE;
+ if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
+ // if the watermark is already past the window fire immediately
+ return TriggerResult.FIRE_AND_PURGE;
+ } else {
+ ctx.registerEventTimeTimer(window.maxTimestamp());
+ return TriggerResult.CONTINUE;
+ }
}
@Override