You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/11/17 14:36:00 UTC

flink git commit: [FLINK-3024] Fix TimestampExtractor.getCurrentWatermark() Behaviour

Repository: flink
Updated Branches:
  refs/heads/master 14c24e797 -> bdb306a11


[FLINK-3024] Fix TimestampExtractor.getCurrentWatermark() Behaviour

Previously the internal currentWatermark would be updated even if the
value returned from getCurrentWatermark was lower than the current
watermark.

This can lead to problems with chaining because the watermark is
directly forwarded without going through the watermark logic that
ensures correct behaviour (monotonically increasing).

This adds a test that verifies that the timestamp extractor does not
emit decreasing watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdb306a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdb306a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdb306a1

Branch: refs/heads/master
Commit: bdb306a1152aa9938bde2cc536db540eb2ca40d0
Parents: 14c24e7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Nov 17 11:40:22 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Nov 17 11:40:22 2015 +0100

----------------------------------------------------------------------
 .../operators/ExtractTimestampsOperator.java    |  6 +-
 .../streaming/timestamp/TimestampITCase.java    | 76 ++++++++++++++++++++
 2 files changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdb306a1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 6e51a49..9c27c6d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -79,10 +79,10 @@ public class ExtractTimestampsOperator<T>
 	public void trigger(long timestamp) throws Exception {
 		// register next timer
 		registerTimer(System.currentTimeMillis() + watermarkInterval, this);
-		long lastWatermark = currentWatermark;
-		currentWatermark = userFunction.getCurrentWatermark();
+		long newWatermark = userFunction.getCurrentWatermark();
 
-		if (currentWatermark > lastWatermark) {
+		if (newWatermark > currentWatermark) {
+			currentWatermark = newWatermark;
 			// emit watermark
 			output.emitWatermark(new Watermark(currentWatermark));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdb306a1/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 749e1dd..5113b45 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -379,6 +380,73 @@ public class TimestampITCase {
 	}
 
 	/**
+	 * This test verifies that the timestamp extractor does not emit decreasing watermarks even
+	 *
+	 */
+	@Test
+	public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(1);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().enableTimestamps();
+		env.getConfig().setAutoWatermarkInterval(1);
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 0;
+				while (index < NUM_ELEMENTS) {
+					ctx.collect(index);
+					Thread.sleep(100);
+					ctx.collect(index - 1);
+					latch.await();
+					index++;
+				}
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		});
+
+		source1.assignTimestamps(new TimestampExtractor<Integer>() {
+			@Override
+			public long extractTimestamp(Integer element, long currentTimestamp) {
+				return element;
+			}
+
+			@Override
+			public long extractWatermark(Integer element, long currentTimestamp) {
+				return element - 1;
+			}
+
+			@Override
+			public long getCurrentWatermark() {
+				return Long.MIN_VALUE;
+			}
+		})
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
+
+
+		env.execute();
+
+		// verify that we get NUM_ELEMENTS watermarks
+		for (int j = 0; j < NUM_ELEMENTS; j++) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
+				Assert.fail("Wrong watermark.");
+			}
+		}
+		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
+			Assert.fail("Wrong watermark.");
+		}
+	}
+
+	/**
 	 * This tests whether the program throws an exception when an event-time source tries
 	 * to emit without timestamp.
 	 */
@@ -442,6 +510,10 @@ public class TimestampITCase {
 		public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
 		private long oldTimestamp;
 
+		public CustomOperator() {
+			setChainingStrategy(ChainingStrategy.ALWAYS);
+		}
+
 		@Override
 		public void processElement(StreamRecord<Integer> element) throws Exception {
 			if (element.getTimestamp() != element.getValue()) {
@@ -473,6 +545,10 @@ public class TimestampITCase {
 
 	public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
 
+		public TimestampCheckingOperator() {
+			setChainingStrategy(ChainingStrategy.ALWAYS);
+		}
+
 		@Override
 		public void processElement(StreamRecord<Integer> element) throws Exception {
 			if (element.getTimestamp() != element.getValue()) {