You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/11/24 10:33:52 UTC

flink git commit: [FLINK-5149] let ContinuousEventTimeTrigger fire at the end of the window

Repository: flink
Updated Branches:
  refs/heads/master 16ee4a5ce -> 2029c14eb


[FLINK-5149] let ContinuousEventTimeTrigger fire at the end of the window

This changes the ContinuousEventTimeTrigger to behave like the
EventTimeTrigger in the sense that it also triggers at the end of the
window.

This prevents the trigger from not firing at all in case the first
trigger interval is after the window end.

This closes #2860.

[typo] fix toString() of ContinuousEventTimeTrigger

This closes #2854.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2029c14e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2029c14e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2029c14e

Branch: refs/heads/master
Commit: 2029c14eb7d5b6788eb9297c271dd16a0d819fbe
Parents: 16ee4a5
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Nov 23 16:01:35 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Nov 24 11:33:14 2016 +0100

----------------------------------------------------------------------
 .../triggers/ContinuousEventTimeTrigger.java    | 22 +++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2029c14e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index fa5bb2f..02c2a42 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -52,31 +52,39 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
 
-		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
+			// if the watermark is already past the window fire immediately
+			return TriggerResult.FIRE;
+		} else {
+			ctx.registerEventTimeTimer(window.maxTimestamp());
+		}
 
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 		if (fireTimestamp.get() == null) {
 			long start = timestamp - (timestamp % interval);
 			long nextFireTimestamp = start + interval;
-
 			ctx.registerEventTimeTimer(nextFireTimestamp);
-
 			fireTimestamp.add(nextFireTimestamp);
-			return TriggerResult.CONTINUE;
 		}
+
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
 	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
-		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 
+		if (time == window.maxTimestamp()){
+			return TriggerResult.FIRE;
+		}
+
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 		if (fireTimestamp.get().equals(time)) {
 			fireTimestamp.clear();
 			fireTimestamp.add(time + interval);
 			ctx.registerEventTimeTimer(time + interval);
 			return TriggerResult.FIRE;
-
 		}
+
 		return TriggerResult.CONTINUE;
 	}
 
@@ -112,7 +120,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 
 	@Override
 	public String toString() {
-		return "ContinuousProcessingTimeTrigger(" + interval + ")";
+		return "ContinuousEventTimeTrigger(" + interval + ")";
 	}
 
 	@VisibleForTesting