You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 11:15:02 UTC
flink git commit: [hotfix] Fix processing time triggering on Window
Operator
Repository: flink
Updated Branches:
refs/heads/master cd7ed8e38 -> 3385fa3ab
[hotfix] Fix processing time triggering on Window Operator
Before it would only trigger if expectedTime < time. Now it is
expectedTime <= time.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3385fa3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3385fa3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3385fa3a
Branch: refs/heads/master
Commit: 3385fa3ab5584f74df93cb9597c0405d0be391f5
Parents: cd7ed8e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 11:13:37 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 11:14:29 2015 +0200
----------------------------------------------------------------------
.../runtime/operators/windowing/NonKeyedWindowOperator.java | 6 +++---
.../streaming/runtime/operators/windowing/WindowOperator.java | 5 +++--
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3385fa3a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 101c818..a002b23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -304,15 +304,15 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
Set<Long> toRemove = new HashSet<>();
for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
- if (triggers.getKey() < time) {
+ long actualTime = triggers.getKey();
+ if (actualTime <= time) {
for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+ Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
processTriggerResult(triggerResult, context.window);
}
toRemove.add(triggers.getKey());
}
}
-
for (Long l: toRemove) {
processingTimeTimers.remove(l);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3385fa3a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 04c393c..a80f971 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -365,9 +365,10 @@ public class WindowOperator<K, IN, OUT, W extends Window>
Set<Long> toRemove = new HashSet<>();
for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
- if (triggers.getKey() < time) {
+ long actualTime = triggers.getKey();
+ if (actualTime <= time) {
for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+ Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
processTriggerResult(triggerResult, context.key, context.window);
}
toRemove.add(triggers.getKey());