You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/02 09:23:41 UTC

flink git commit: [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs

Repository: flink
Updated Branches:
  refs/heads/release-1.3 43d76f008 -> b67bc4da5


[FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs

Prior to this commit, In the calculation of the new min watermark in
StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(),
there is no verification that the calculated new min watermark
really is aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned
but actually some are active, we would then incorrectly determine that
the final aggregation is Long.MAX_VALUE and emit that.

This commit fixes this by only emitting the aggregated watermark iff it was
really calculated from some aligned input channel (as well as the
already existing constraint that it needs to be larger than the last
emitted watermark). This change should also safely cover the case that a
Long.MAX_VALUE was genuinely aggregated from the input channels.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b67bc4da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b67bc4da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b67bc4da

Branch: refs/heads/release-1.3
Commit: b67bc4da568df08f5a27f5a84546b54c85b5bb7a
Parents: 43d76f0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Sep 27 20:05:21 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 2 10:36:00 2017 +0200

----------------------------------------------------------------------
 .../streamstatus/StatusWatermarkValve.java      |  9 ++++++---
 .../streamstatus/StatusWatermarkValveTest.java  | 21 ++++++++++++++++----
 2 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b67bc4da/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
index 487fd8a..a257454 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -159,16 +159,19 @@ public class StatusWatermarkValve {
 
 	private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
 		long newMinWatermark = Long.MAX_VALUE;
+		boolean hasAlignedChannels = false;
 
 		// determine new overall watermark by considering only watermark-aligned channels across all channels
 		for (InputChannelStatus channelStatus : channelStatuses) {
 			if (channelStatus.isWatermarkAligned) {
+				hasAlignedChannels = true;
 				newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
 			}
 		}
 
-		// we acknowledge and output the new overall watermark if it is larger than the last output watermark
-		if (newMinWatermark > lastOutputWatermark) {
+		// we acknowledge and output the new overall watermark if it really is aggregated
+		// from some remaining aligned channel, and is also larger than the last output watermark
+		if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
 			lastOutputWatermark = newMinWatermark;
 			outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
 		}
@@ -183,7 +186,7 @@ public class StatusWatermarkValve {
 	 * <ul>
 	 *   <li>the current stream status of the channel is idle
 	 *   <li>the stream status has resumed to be active, but the watermark of the channel hasn't
-	 *   caught up to thelast output watermark from the valve yet.
+	 *   caught up to the last output watermark from the valve yet.
 	 * </ul>
 	 */
 	private static class InputChannelStatus {

http://git-wip-us.apache.org/repos/asf/flink/blob/b67bc4da/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index 564901f..b9651f9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -246,7 +246,7 @@ public class StatusWatermarkValveTest {
 
 		// ------------------------------------------------------------------------
 		//  Ensure that after some channel resumes to be ACTIVE, it needs to
-		//  catch up" with the current overall min watermark before it can be
+		//  "catch up" with the current overall min watermark before it can be
 		//  accounted for again when finding the min watermark across channels.
 		//  Also tests that before the resumed channel catches up, the overall
 		//  min watermark can still advance with watermarks of other channels.
@@ -297,7 +297,7 @@ public class StatusWatermarkValveTest {
 
 		// ------------------------------------------------------------------------
 		//  Ensure that once all channels are IDLE, the valve should also
-		//  determine itself to be IDLE output a IDLE stream status
+		//  determine itself to be IDLE and output a IDLE stream status
 		// ------------------------------------------------------------------------
 
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
@@ -311,13 +311,14 @@ public class StatusWatermarkValveTest {
 		assertTrue(valveOutput.hasNoOutputWatermarks());
 		assertTrue(valveOutput.hasNoOutputStreamStatuses());
 
+		// now let all channels become idle; we should only see the idle marker be emitted, and nothing else
 		valve.inputStreamStatus(StreamStatus.IDLE, 1);
 		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
 		assertTrue(valveOutput.hasNoOutputWatermarks());
 		assertTrue(valveOutput.hasNoOutputStreamStatuses());
 
 		// ------------------------------------------------------------------------
-		//  Ensure that channels gradually become ACTIVE again, the above behaviours
+		//  Ensure that as channels gradually become ACTIVE again, the above behaviours
 		//  still hold. Also ensure that as soon as one of the input channels
 		//  become ACTIVE, the valve is ACTIVE again and outputs an ACTIVE stream status.
 		// ------------------------------------------------------------------------
@@ -352,7 +353,19 @@ public class StatusWatermarkValveTest {
 		assertTrue(valveOutput.hasNoOutputWatermarks());
 		assertTrue(valveOutput.hasNoOutputStreamStatuses());
 
-		// now, channel 1 has caught up with the overall min watermark
+		// temporarily let channel 0 (the only active and aligned input) become idle;
+		// this should not result in any watermark or stream status output,
+		// because channel 1 is still active (therefore no stream status toggle) and
+		// at the same time not aligned (therefore should not produce any new min watermarks)
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// now, let channel 1 catch up with the overall min watermark
 		valve.inputWatermark(new Watermark(38), 1);
 		assertTrue(valveOutput.hasNoOutputWatermarks());
 		assertTrue(valveOutput.hasNoOutputStreamStatuses());