You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 11:15:02 UTC

flink git commit: [hotfix] Fix processing time triggering on Window Operator

Repository: flink
Updated Branches:
  refs/heads/master cd7ed8e38 -> 3385fa3ab


[hotfix] Fix processing time triggering on Window Operator

Before it would only trigger if expectedTime < time. Now it is
expectedTime <= time.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3385fa3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3385fa3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3385fa3a

Branch: refs/heads/master
Commit: 3385fa3ab5584f74df93cb9597c0405d0be391f5
Parents: cd7ed8e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 11:13:37 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 11:14:29 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/windowing/NonKeyedWindowOperator.java    | 6 +++---
 .../streaming/runtime/operators/windowing/WindowOperator.java  | 5 +++--
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3385fa3a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 101c818..a002b23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -304,15 +304,15 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
-			if (triggers.getKey() < time) {
+			long actualTime = triggers.getKey();
+			if (actualTime <= time) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
 					processTriggerResult(triggerResult, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
 		}
-
 		for (Long l: toRemove) {
 			processingTimeTimers.remove(l);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3385fa3a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 04c393c..a80f971 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -365,9 +365,10 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
-			if (triggers.getKey() < time) {
+			long actualTime = triggers.getKey();
+			if (actualTime <= time) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
 					processTriggerResult(triggerResult, context.key, context.window);
 				}
 				toRemove.add(triggers.getKey());