You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/05/14 08:11:10 UTC
[2/2] flink git commit: [FLINK-9201] Fixed the same merge window will
be fired twice if watermark already passed the merge window
[FLINK-9201] Fixed the same merge window will be fired twice if watermark already passed the merge window
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/038eb1d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/038eb1d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/038eb1d0
Branch: refs/heads/release-1.5
Commit: 038eb1d02f720ddfda1854dcfb04789088c7c823
Parents: f84a164
Author: yuemeng <hz...@corp.netease.com>
Authored: Thu Apr 26 16:55:05 2018 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon May 14 10:10:42 2018 +0200
----------------------------------------------------------------------
.../streaming/api/windowing/triggers/EventTimeTrigger.java | 8 +++++++-
.../api/windowing/triggers/ProcessingTimeTrigger.java | 8 +++++++-
2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/038eb1d0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 2f8f16f..2066bba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -69,7 +69,13 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
- ctx.registerEventTimeTimer(window.maxTimestamp());
+ // only register a timer if the watermark is not yet past the end of the merged window
+ // this is in line with the logic in onElement(). If the watermark is past the end of
+ // the window onElement() will fire and setting a timer here would fire the window twice.
+ long windowMaxTimestamp = window.maxTimestamp();
+ if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
+ ctx.registerEventTimeTimer(windowMaxTimestamp);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/038eb1d0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index cd7869e..c8e6e6c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -60,7 +60,13 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
- ctx.registerProcessingTimeTimer(window.maxTimestamp());
+ // only register a timer if the time is not yet past the end of the merged window
+ // this is in line with the logic in onElement(). If the time is past the end of
+ // the window onElement() will fire and setting a timer here would fire the window twice.
+ long windowMaxTimestamp = window.maxTimestamp();
+ if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
+ ctx.registerProcessingTimeTimer(windowMaxTimestamp);
+ }
}
@Override