You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/11 11:15:50 UTC
[5/6] flink git commit: [FLINK-5237] Don't Fire Processing-Time Timer
in registerTimer()
[FLINK-5237] Don't Fire Processing-Time Timer in registerTimer()
Immediately firing the timer can lead to endless recursion if the
onTimer() method sets a timer for the past.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c8c0288
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c8c0288
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c8c0288
Branch: refs/heads/master
Commit: 8c8c02887a27cdc87bb019626f82ec03392ca8ce
Parents: 274cc41
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jan 10 14:15:42 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jan 11 10:35:47 2017 +0100
----------------------------------------------------------------------
.../runtime/tasks/TestProcessingTimeService.java | 12 ++----------
1 file changed, 2 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c8c0288/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 3c33ad3..b4e7e97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -53,7 +53,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
}
});
}
-
+
public void setCurrentTime(long timestamp) throws Exception {
this.currentTime = timestamp;
@@ -90,15 +90,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
CallbackTask callbackTask = new CallbackTask(target);
- if (timestamp <= currentTime) {
- try {
- callbackTask.onProcessingTime(timestamp);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- } else {
- priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
- }
+ priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
return callbackTask;
}