You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/11 11:15:50 UTC

[5/6] flink git commit: [FLINK-5237] Don't Fire Processing-Time Timer in registerTimer()

[FLINK-5237] Don't Fire Processing-Time Timer in registerTimer()

Immediately firing the timer can lead to endless recursion if the
onTimer() method sets a timer for the past.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c8c0288
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c8c0288
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c8c0288

Branch: refs/heads/master
Commit: 8c8c02887a27cdc87bb019626f82ec03392ca8ce
Parents: 274cc41
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jan 10 14:15:42 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jan 11 10:35:47 2017 +0100

----------------------------------------------------------------------
 .../runtime/tasks/TestProcessingTimeService.java        | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c8c0288/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 3c33ad3..b4e7e97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -53,7 +53,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			}
 		});
 	}
-	
+
 	public void setCurrentTime(long timestamp) throws Exception {
 		this.currentTime = timestamp;
 
@@ -90,15 +90,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 
 		CallbackTask callbackTask = new CallbackTask(target);
 
-		if (timestamp <= currentTime) {
-			try {
-				callbackTask.onProcessingTime(timestamp);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		} else {
-			priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
-		}
+		priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
 
 		return callbackTask;
 	}