You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/15 15:02:00 UTC

flink git commit: [hotfix] Make initial currentWatermark==Long.MIN_VALUE everywhere

Repository: flink
Updated Branches:
  refs/heads/master 50d8797bb -> 1b93b3242


[hotfix] Make initial currentWatermark==Long.MIN_VALUE everywhere


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b93b324
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b93b324
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b93b324

Branch: refs/heads/master
Commit: 1b93b32426f6cf6ef344880c4b45f78e983bd7de
Parents: 50d8797
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Apr 15 10:15:50 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Apr 15 15:01:08 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/operators/ExtractTimestampsOperator.java  | 2 +-
 .../streaming/runtime/operators/windowing/WindowOperator.java   | 5 ++++-
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b93b324/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index a3de138..a4815dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -57,7 +57,7 @@ public class ExtractTimestampsOperator<T>
 			registerTimer(System.currentTimeMillis() + watermarkInterval, this);
 		}
 
-		currentWatermark = -1L;
+		currentWatermark = Long.MIN_VALUE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1b93b324/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index ce73e15..c106e70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -133,7 +133,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * To keep track of the current watermark so that we can immediately fire if a trigger
 	 * registers an event time callback for a timestamp that lies in the past.
 	 */
-	protected transient long currentWatermark = -1L;
+	protected transient long currentWatermark = Long.MIN_VALUE;
 
 	protected transient Context context = new Context(null, null);
 
@@ -216,6 +216,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindowsByKey = new HashMap<>();
 		}
+
+		currentWatermark = Long.MIN_VALUE;
+
 	}
 
 	@Override