You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/02 14:54:53 UTC

[1/4] flink git commit: [FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in StatusWatermarkValveTest

Repository: flink
Updated Branches:
  refs/heads/release-1.3 b67bc4da5 -> 60dec1160


[FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in StatusWatermarkValveTest

The previous implementation was overly complicated. Having separate
buffers for the StreamStatus and Watermarks is not required for our
tests. Also, that design doesn't allow checking the order StreamStatus /
Watermarks are emitted from a single input to the valve.

This commit reworks it by buffering both StreamStatus and Watermarks in
a shared queue.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4dcd9fba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4dcd9fba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4dcd9fba

Branch: refs/heads/release-1.3
Commit: 4dcd9fba21eef157063d2d70b0e55869009f2954
Parents: b67bc4d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Sep 28 14:11:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 2 16:51:27 2017 +0200

----------------------------------------------------------------------
 .../streamstatus/StatusWatermarkValveTest.java  | 205 ++++++-------------
 1 file changed, 61 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4dcd9fba/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index b9651f9..0e4717f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -19,13 +19,14 @@
 package org.apache.flink.streaming.runtime.streamstatus;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.junit.Test;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link StatusWatermarkValve}. While tests in {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest}
@@ -34,8 +35,7 @@ import static org.junit.Assert.assertTrue;
  * the watermarks and stream statuses to forward are generated from the valve at the exact correct times and in a
  * deterministic behaviour. The unit tests here also test more complex stream status / watermark input cases.
  *
- * <p>
- * The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call,
+ * <p>The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call,
  * the output is checked to contain only the expected watermark or stream status, and nothing else. This ensures that
  * no redundant outputs are generated by the output logic of {@link StatusWatermarkValve}. The behaviours that a series of
  * input calls to the valve is trying to test is explained as inline comments within the tests.
@@ -57,21 +57,13 @@ public class StatusWatermarkValveTest {
 		// ------------------------------------------------------------------------
 
 		valve.inputStreamStatus(StreamStatus.IDLE, 3);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
-
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
-
 		valve.inputStreamStatus(StreamStatus.IDLE, 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
@@ -86,73 +78,57 @@ public class StatusWatermarkValveTest {
 		// start off with an ACTIVE status; since the valve should initially start as ACTIVE,
 		// no state change is toggled, therefore no stream status should be emitted
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// input some monotonously increasing watermarks while ACTIVE;
 		// the exact same watermarks should be emitted right after the inputs
 		valve.inputWatermark(new Watermark(0), 0);
-		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(25), 0);
-		assertEquals(new Watermark(25), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 
 		// decreasing watermarks should not result in any output
 		valve.inputWatermark(new Watermark(18), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(42), 0);
-		assertEquals(new Watermark(42), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(42), valveOutput.popLastSeenOutput());
 
 		// toggling ACTIVE to IDLE should result in an IDLE stream status output
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
 		// watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE
 		valve.inputWatermark(new Watermark(52), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(60), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
 
 		// no status change toggle while IDLE should result in stream status outputs
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
 
 		// toggling IDLE to ACTIVE should result in an ACTIVE stream status output
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
+
 
 		// the valve should remember the last watermark input channels received while they were ACTIVE (which was 42);
 		// decreasing watermarks should therefore still be ignored, even after a status toggle
 		valve.inputWatermark(new Watermark(40), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// monotonously increasing watermarks after resuming to be ACTIVE should be output normally
 		valve.inputWatermark(new Watermark(68), 0);
-		assertEquals(new Watermark(68), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(68), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(72), 0);
-		assertEquals(new Watermark(72), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(72), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
@@ -170,17 +146,13 @@ public class StatusWatermarkValveTest {
 		// ------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(0), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(0), 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(0), 2);
-		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that watermarks are output as soon as the overall min
@@ -188,30 +160,24 @@ public class StatusWatermarkValveTest {
 		// ------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(12), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(8), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(10), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(15), 1);
 		// lowest watermark across all channels is now channel 2, with watermark @ 10
-		assertEquals(new Watermark(10), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that decreasing watermarks are ignored
 		// ------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(6), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that when some input channel becomes idle, that channel will
@@ -224,25 +190,18 @@ public class StatusWatermarkValveTest {
 		// also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0),
 		// so the valve should output that
 		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(new Watermark(12), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(12), valveOutput.popLastSeenOutput());
 
 		// from now on, since channel 2 is IDLE, the valve should use watermarks only from
 		// channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10)
 		valve.inputWatermark(new Watermark(17), 0);
-		assertEquals(new Watermark(15), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(25), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(20), 1);
-		assertEquals(new Watermark(20), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(20), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that after some channel resumes to be ACTIVE, it needs to
@@ -255,45 +214,34 @@ public class StatusWatermarkValveTest {
 		// resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve,
 		// because the valve wasn't overall IDLE, so there should not be any stream status outputs;
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// although watermarks for channel 2 will now be accepted, it still
 		// hasn't caught up with the overall min watermark (20)
 		valve.inputWatermark(new Watermark(18), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks
 		valve.inputWatermark(new Watermark(22), 1);
-		assertEquals(new Watermark(22), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(22), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(28), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(33), 1);
-		assertEquals(new Watermark(28), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(28), valveOutput.popLastSeenOutput());
 
 		// now, channel 2 has caught up with the overall min watermark
 		valve.inputWatermark(new Watermark(30), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(31), 0);
 		// this acknowledges that channel 2's watermark is being accounted for again
-		assertEquals(new Watermark(30), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(30), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(34), 2);
-		assertEquals(new Watermark(31), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(31), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that once all channels are IDLE, the valve should also
@@ -303,19 +251,14 @@ public class StatusWatermarkValveTest {
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		// this is because once channel 0 becomes IDLE,
 		// the new min watermark will be 33 (channel 1)
-		assertEquals(new Watermark(33), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(33), valveOutput.popLastSeenOutput());
 
 		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// now let all channels become idle; we should only see the idle marker be emitted, and nothing else
 		valve.inputStreamStatus(StreamStatus.IDLE, 1);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that as channels gradually become ACTIVE again, the above behaviours
@@ -325,86 +268,60 @@ public class StatusWatermarkValveTest {
 
 		// let channel 0 resume to be ACTIVE
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
 
 		// channel 0 is the only ACTIVE channel now, and is the only channel
 		// accounted for when advancing min watermark
 		valve.inputWatermark(new Watermark(36), 0);
-		assertEquals(new Watermark(36), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(36), valveOutput.popLastSeenOutput());
 
 		// new also let channel 1 become ACTIVE
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// channel 1 is still behind overall min watermark
 		valve.inputWatermark(new Watermark(35), 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// since channel 1 is still behind, channel 0 remains to be the only
 		// channel used to advance min watermark
 		valve.inputWatermark(new Watermark(37), 0);
-		assertEquals(new Watermark(37), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(37), valveOutput.popLastSeenOutput());
 
 		// temporarily let channel 0 (the only active and aligned input) become idle;
 		// this should not result in any watermark or stream status output,
 		// because channel 1 is still active (therefore no stream status toggle) and
 		// at the same time not aligned (therefore should not produce any new min watermarks)
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// now, let channel 1 catch up with the overall min watermark
 		valve.inputWatermark(new Watermark(38), 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(40), 0);
 		// this acknowledges that channel 1's watermark is being accounted for again
-		assertEquals(new Watermark(38), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(38), valveOutput.popLastSeenOutput());
 	}
 
 	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
-		private BlockingQueue<Watermark> outputWatermarks = new LinkedBlockingQueue<>();
-		private BlockingQueue<StreamStatus> outputStreamStatuses = new LinkedBlockingQueue<>();
+		private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<>();
 
 		@Override
 		public void handleWatermark(Watermark watermark) {
-			outputWatermarks.add(watermark);
+			allOutputs.add(watermark);
 		}
 
 		@Override
 		public void handleStreamStatus(StreamStatus streamStatus) {
-			outputStreamStatuses.add(streamStatus);
-		}
-
-		public Watermark popLastOutputWatermark() {
-			return outputWatermarks.poll();
-		}
-
-		public StreamStatus popLastOutputStreamStatus() {
-			return outputStreamStatuses.poll();
-		}
-
-		public boolean hasNoOutputWatermarks() {
-			return outputWatermarks.size() == 0;
+			allOutputs.add(streamStatus);
 		}
 
-		public boolean hasNoOutputStreamStatuses() {
-			return outputStreamStatuses.size() == 0;
+		public StreamElement popLastSeenOutput() {
+			return allOutputs.poll();
 		}
 	}
 


[2/4] flink git commit: [FLINK-7728] [DataStream] Flush max watermark across all inputs once all become idle

Posted by al...@apache.org.
[FLINK-7728] [DataStream] Flush max watermark across all inputs once all become idle

Prior to this commit, once all inputs of the StatusWatermarkValve
becomes idle, we only emit the StreamStatus.IDLE marker, and check
nothing else. This makes the watermark advancement behaviour
inconsistent in the case that all inputs become idle depending on the
order that they become idle.

This commit fixes this by "flushing" the max watermark across all
channels once all inputs become idle. At a high-level, what this means
for downstream operators is that all inputs have become idle and will
temporariliy cease to advance their watermarks, so they can safely
advance their event time to whatever the largest watermark is.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2875260d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2875260d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2875260d

Branch: refs/heads/release-1.3
Commit: 2875260d067c1e6754a248b8e281516ad9b5a269
Parents: 4dcd9fb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Sep 28 16:49:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 2 16:51:30 2017 +0200

----------------------------------------------------------------------
 .../streamstatus/StatusWatermarkValve.java      | 23 +++++++++++
 .../streamstatus/StatusWatermarkValveTest.java  | 42 ++++++++++++++++++++
 2 files changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2875260d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
index a257454..3dceb0a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -130,6 +130,16 @@ public class StatusWatermarkValve {
 			// if all input channels of the valve are now idle, we need to output an idle stream
 			// status from the valve (this also marks the valve as idle)
 			if (!InputChannelStatus.hasActiveChannels(channelStatuses)) {
+
+				// now that all input channels are idle and no channels will continue to advance its watermark,
+				// we should "flush" all watermarks across all channels; effectively, this means emitting
+				// the max watermark across all channels as the new watermark. Also, since we already try to advance
+				// the min watermark as channels individually become IDLE, here we only need to perform the flush
+				// if the watermark of the last active channel that just became idle is the current min watermark.
+				if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
+					findAndOutputMaxWatermarkAcrossAllChannels();
+				}
+
 				lastOutputStreamStatus = StreamStatus.IDLE;
 				outputHandler.handleStreamStatus(lastOutputStreamStatus);
 			} else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
@@ -177,6 +187,19 @@ public class StatusWatermarkValve {
 		}
 	}
 
+	private void findAndOutputMaxWatermarkAcrossAllChannels() {
+		long maxWatermark = Long.MIN_VALUE;
+
+		for (InputChannelStatus channelStatus : channelStatuses) {
+			maxWatermark = Math.max(channelStatus.watermark, maxWatermark);
+		}
+
+		if (maxWatermark > lastOutputWatermark) {
+			lastOutputWatermark = maxWatermark;
+			outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
+		}
+	}
+
 	/**
 	 * An {@code InputChannelStatus} keeps track of an input channel's last watermark, stream
 	 * status, and whether or not the channel's current watermark is aligned with the overall

http://git-wip-us.apache.org/repos/asf/flink/blob/2875260d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index 0e4717f..70dc565 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -307,6 +307,48 @@ public class StatusWatermarkValveTest {
 		assertEquals(new Watermark(38), valveOutput.popLastSeenOutput());
 	}
 
+	/**
+	 * Tests that when all inputs become idle, the max watermark across all channels
+	 * is correctly "flushed" from the valve, as well as the stream status IDLE marker.
+	 */
+	@Test
+	public void testAllInputsBecomeIdleFlushMaxWatermarkAndStreamStatus() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		// -------------------------------------------------------------------------------------------
+		// Setup valve for test case:
+		//  channel #1: Watermark 10, ACTIVE
+		//  channel #2: Watermark 5, ACTIVE
+		//  channel #3: Watermark 3, ACTIVE
+		//  Min Watermark across channels = 3 (from channel #3)
+		// -------------------------------------------------------------------------------------------
+
+		valve.inputWatermark(new Watermark(10), 0);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		valve.inputWatermark(new Watermark(5), 1);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		valve.inputWatermark(new Watermark(3), 2);
+		assertEquals(new Watermark(3), valveOutput.popLastSeenOutput());
+
+		// -------------------------------------------------------------------------------------------
+		// Order of becoming IDLE:
+		//  channel #1 ----------------> channel #2 ----------------> channel #3
+		//   |-> (nothing emitted)        |-> (nothing emitted)        |-> Emit Watermark(10) & IDLE
+		// -------------------------------------------------------------------------------------------
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
+
 	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
 		private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<>();
 


[3/4] flink git commit: [FLINK-7728] [DataStream] Make StatusWatermarkValve unit tests more fine-grained

Posted by al...@apache.org.
[FLINK-7728] [DataStream] Make StatusWatermarkValve unit tests more fine-grained

Previously, the unit tests in StatusWatermarkValveTest were too
cluttered and testing too many behaviours in a single test. This makes
it hard to have a good overview of what test cases are covered.

This commit is a rework of the previous tests, making them more
fine-grained so that the scope of each test is small enough. All
previously tested behaviours are still covered.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c528139c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c528139c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c528139c

Branch: refs/heads/release-1.3
Commit: c528139c3c2b33ba7bd11df154ba204408713b02
Parents: 2875260
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Sep 29 11:16:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 2 16:52:22 2017 +0200

----------------------------------------------------------------------
 .../streamstatus/StatusWatermarkValve.java      |  26 +-
 .../streamstatus/StatusWatermarkValveTest.java  | 366 ++++++++++---------
 2 files changed, 214 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c528139c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
index 3dceb0a..5f1828f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.streaming.runtime.streamstatus;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@code StatusWatermarkValve} embodies the logic of how {@link Watermark} and {@link StreamStatus} are propagated to
@@ -40,6 +42,7 @@ public class StatusWatermarkValve {
 	 */
 	public interface ValveOutputHandler {
 		void handleWatermark(Watermark watermark);
+
 		void handleStreamStatus(StreamStatus streamStatus);
 	}
 
@@ -212,10 +215,11 @@ public class StatusWatermarkValve {
 	 *   caught up to the last output watermark from the valve yet.
 	 * </ul>
 	 */
-	private static class InputChannelStatus {
-		private long watermark;
-		private StreamStatus streamStatus;
-		private boolean isWatermarkAligned;
+	@VisibleForTesting
+	protected static class InputChannelStatus {
+		protected long watermark;
+		protected StreamStatus streamStatus;
+		protected boolean isWatermarkAligned;
 
 		/**
 		 * Utility to check if at least one channel in a given array of input channels is active.
@@ -230,4 +234,12 @@ public class StatusWatermarkValve {
 		}
 	}
 
+	@VisibleForTesting
+	protected InputChannelStatus getInputChannelStatus(int channelIndex) {
+		Preconditions.checkArgument(
+			channelIndex >= 0 && channelIndex < channelStatuses.length,
+			"Invalid channel index. Number of input channels: " + channelStatuses.length);
+
+		return channelStatuses[channelIndex];
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c528139c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index 70dc565..fbf0622 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -19,14 +19,16 @@
 package org.apache.flink.streaming.runtime.streamstatus;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
-
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
 import org.junit.Test;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link StatusWatermarkValve}. While tests in {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest}
@@ -43,276 +45,262 @@ import static org.junit.Assert.assertEquals;
 public class StatusWatermarkValveTest {
 
 	/**
-	 * Tests that all input channels of a valve start as ACTIVE stream status.
+	 * Tests that watermarks correctly advance with increasing watermarks for a single input valve.
 	 */
 	@Test
-	public void testAllInputChannelsStartAsActive() {
+	public void testSingleInputIncreasingWatermarks() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
-		StatusWatermarkValve valve = new StatusWatermarkValve(4, valveOutput);
-
-		// ------------------------------------------------------------------------
-		//  Ensure that the valve will output an IDLE stream status as soon as
-		//  all input channels become IDLE; this also implicitly ensures that
-		//  all input channels start as ACTIVE.
-		// ------------------------------------------------------------------------
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
 
-		valve.inputStreamStatus(StreamStatus.IDLE, 3);
-		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		valve.inputWatermark(new Watermark(0), 0);
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(25), 0);
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
-	 * Tests that valves work as expected when they handle only 1 input channel.
-	 * Tested behaviours are explained as inline comments.
+	 * Tests that watermarks do not advance with decreasing watermark inputs for a single input valve.
 	 */
 	@Test
-	public void testOneInputValve() {
+	public void testSingleInputDecreasingWatermarksYieldsNoOutput() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
 		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
 
-		// start off with an ACTIVE status; since the valve should initially start as ACTIVE,
-		// no state change is toggled, therefore no stream status should be emitted
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
-		// input some monotonously increasing watermarks while ACTIVE;
-		// the exact same watermarks should be emitted right after the inputs
-		valve.inputWatermark(new Watermark(0), 0);
-		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(25), 0);
 		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 
-		// decreasing watermarks should not result in any output
 		valve.inputWatermark(new Watermark(18), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(42), 0);
 		assertEquals(new Watermark(42), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
+
+	/**
+	 * Tests that stream status toggling works correctly, as well as that non-toggling status
+	 * inputs do not yield output for a single input valve.
+	 */
+	@Test
+	public void testSingleInputStreamStatusToggling() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
+
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		// this also implicitly verifies that input channels start as ACTIVE
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// toggling ACTIVE to IDLE should result in an IDLE stream status output
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
-		// watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE
-		valve.inputWatermark(new Watermark(52), 0);
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(60), 0);
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
+	/**
+	 * Tests that the watermark of an input channel remains intact while in the IDLE status.
+	 */
+	@Test
+	public void testSingleInputWatermarksIntactDuringIdleness() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
 
-		// no status change toggle while IDLE should result in stream status outputs
-		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		valve.inputWatermark(new Watermark(25), 0);
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+
+		valve.inputWatermark(new Watermark(50), 0);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+		assertEquals(25, valve.getInputChannelStatus(0).watermark);
 
-		// toggling IDLE to ACTIVE should result in an ACTIVE stream status output
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
 		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
-
-
-		// the valve should remember the last watermark input channels received while they were ACTIVE (which was 42);
-		// decreasing watermarks should therefore still be ignored, even after a status toggle
-		valve.inputWatermark(new Watermark(40), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// monotonously increasing watermarks after resuming to be ACTIVE should be output normally
-		valve.inputWatermark(new Watermark(68), 0);
-		assertEquals(new Watermark(68), valveOutput.popLastSeenOutput());
-
-		valve.inputWatermark(new Watermark(72), 0);
-		assertEquals(new Watermark(72), valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(50), 0);
+		assertEquals(new Watermark(50), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
-	 * Tests that valves work as expected when they handle multiple input channels (tested with 3).
-	 * Tested behaviours are explained as inline comments.
+	 * Tests that the valve yields a watermark only when all inputs have received a watermark.
 	 */
 	@Test
-	public void testMultipleInputValve() {
+	public void testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
 		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		// ------------------------------------------------------------------------
-		//  Ensure that watermarks are output only when all
-		//  channels have been input some watermark.
-		// ------------------------------------------------------------------------
-
 		valve.inputWatermark(new Watermark(0), 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(0), 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
+		// now, all channels have watermarks
 		valve.inputWatermark(new Watermark(0), 2);
 		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// ------------------------------------------------------------------------
-		//  Ensure that watermarks are output as soon as the overall min
-		//  watermark across all channels have advanced.
-		// ------------------------------------------------------------------------
+	/**
+	 * Tests that new min watermark is emitted from the valve as soon as the overall
+	 * new min watermark across inputs advances.
+	 */
+	@Test
+	public void testMultipleInputIncreasingWatermarks() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		valve.inputWatermark(new Watermark(12), 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(0), 0);
+		valve.inputWatermark(new Watermark(0), 1);
+		valve.inputWatermark(new Watermark(0), 2);
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 
+		valve.inputWatermark(new Watermark(12), 0);
 		valve.inputWatermark(new Watermark(8), 2);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(10), 2);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(15), 1);
 		// lowest watermark across all channels is now channel 2, with watermark @ 10
 		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
-
-		// ------------------------------------------------------------------------
-		//  Ensure that decreasing watermarks are ignored
-		// ------------------------------------------------------------------------
-
-		valve.inputWatermark(new Watermark(6), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// ------------------------------------------------------------------------
-		//  Ensure that when some input channel becomes idle, that channel will
-		//  no longer be accounted for when advancing the watermark.
-		// ------------------------------------------------------------------------
-
-		// marking channel 2 as IDLE shouldn't result in overall status toggle for the valve,
-		// because there are still other active channels (0 and 1), so there should not be any
-		// stream status outputs;
-		// also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0),
-		// so the valve should output that
-		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		valve.inputWatermark(new Watermark(17), 2);
+		// lowest watermark across all channels is now channel 0, with watermark @ 12
 		assertEquals(new Watermark(12), valveOutput.popLastSeenOutput());
-
-		// from now on, since channel 2 is IDLE, the valve should use watermarks only from
-		// channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10)
-		valve.inputWatermark(new Watermark(17), 0);
-		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
-
-		valve.inputWatermark(new Watermark(25), 0);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(20), 1);
-		assertEquals(new Watermark(20), valveOutput.popLastSeenOutput());
-
-		// ------------------------------------------------------------------------
-		//  Ensure that after some channel resumes to be ACTIVE, it needs to
-		//  "catch up" with the current overall min watermark before it can be
-		//  accounted for again when finding the min watermark across channels.
-		//  Also tests that before the resumed channel catches up, the overall
-		//  min watermark can still advance with watermarks of other channels.
-		// ------------------------------------------------------------------------
-
-		// resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve,
-		// because the valve wasn't overall IDLE, so there should not be any stream status outputs;
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+		valve.inputWatermark(new Watermark(20), 0);
+		// lowest watermark across all channels is now channel 1, with watermark @ 15
+		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// although watermarks for channel 2 will now be accepted, it still
-		// hasn't caught up with the overall min watermark (20)
-		valve.inputWatermark(new Watermark(18), 2);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that for a multiple input valve, decreasing watermarks will yield no output.
+	 */
+	@Test
+	public void testMultipleInputDecreasingWatermarksYieldsNoOutput() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		// since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks
-		valve.inputWatermark(new Watermark(22), 1);
-		assertEquals(new Watermark(22), valveOutput.popLastSeenOutput());
+		valve.inputWatermark(new Watermark(25), 0);
+		valve.inputWatermark(new Watermark(10), 1);
+		valve.inputWatermark(new Watermark(17), 2);
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(28), 0);
+		valve.inputWatermark(new Watermark(12), 0);
+		valve.inputWatermark(new Watermark(8), 1);
+		valve.inputWatermark(new Watermark(15), 2);
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		valve.inputWatermark(new Watermark(33), 1);
-		assertEquals(new Watermark(28), valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that stream status toggling works correctly, as well as that non-toggling status
+	 * inputs do not yield output for a multiple input valve.
+	 */
+	@Test
+	public void testMultipleInputStreamStatusToggling() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(2, valveOutput);
 
-		// now, channel 2 has caught up with the overall min watermark
-		valve.inputWatermark(new Watermark(30), 2);
+		// this also implicitly verifies that all input channels start as active
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(31), 0);
-		// this acknowledges that channel 2's watermark is being accounted for again
-		assertEquals(new Watermark(30), valveOutput.popLastSeenOutput());
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(34), 2);
-		assertEquals(new Watermark(31), valveOutput.popLastSeenOutput());
-
-		// ------------------------------------------------------------------------
-		//  Ensure that once all channels are IDLE, the valve should also
-		//  determine itself to be IDLE and output a IDLE stream status
-		// ------------------------------------------------------------------------
-
+		// now, all channels are IDLE
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		// this is because once channel 0 becomes IDLE,
-		// the new min watermark will be 33 (channel 1)
-		assertEquals(new Watermark(33), valveOutput.popLastSeenOutput());
-
-		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
-		// now let all channels become idle; we should only see the idle marker be emitted, and nothing else
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		valve.inputStreamStatus(StreamStatus.IDLE, 1);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// ------------------------------------------------------------------------
-		//  Ensure that as channels gradually become ACTIVE again, the above behaviours
-		//  still hold. Also ensure that as soon as one of the input channels
-		//  become ACTIVE, the valve is ACTIVE again and outputs an ACTIVE stream status.
-		// ------------------------------------------------------------------------
+		// as soon as at least one input becomes active again, the ACTIVE marker should be forwarded
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
 
-		// let channel 0 resume to be ACTIVE
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
+		// already back to ACTIVE, should yield no output
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// channel 0 is the only ACTIVE channel now, and is the only channel
-		// accounted for when advancing min watermark
-		valve.inputWatermark(new Watermark(36), 0);
-		assertEquals(new Watermark(36), valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that for multiple inputs, when some inputs are idle, the min watermark
+	 * is correctly computed and advanced from the remaining active inputs.
+	 */
+	@Test
+	public void testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
-		// new also let channel 1 become ACTIVE
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+		valve.inputWatermark(new Watermark(15), 0);
+		valve.inputWatermark(new Watermark(10), 1);
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// channel 1 is still behind overall min watermark
-		valve.inputWatermark(new Watermark(35), 1);
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		// min watermark should be computed from remaining ACTIVE channels
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		// since channel 1 is still behind, channel 0 remains to be the only
-		// channel used to advance min watermark
-		valve.inputWatermark(new Watermark(37), 0);
-		assertEquals(new Watermark(37), valveOutput.popLastSeenOutput());
-
-		// temporarily let channel 0 (the only active and aligned input) become idle;
-		// this should not result in any watermark or stream status output,
-		// because channel 1 is still active (therefore no stream status toggle) and
-		// at the same time not aligned (therefore should not produce any new min watermarks)
-		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		valve.inputWatermark(new Watermark(18), 1);
+		// now, min watermark should be 15 from channel #0
+		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
 
-		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		valve.inputWatermark(new Watermark(20), 0);
+		// now, min watermark should be 18 from channel #1
+		assertEquals(new Watermark(18), valveOutput.popLastSeenOutput());
 		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
 
-		// now, let channel 1 catch up with the overall min watermark
-		valve.inputWatermark(new Watermark(38), 1);
-		assertEquals(null, valveOutput.popLastSeenOutput());
+	/**
+	 * Tests that as input channels individually and gradually become idle, watermarks
+	 * are output as soon remaining active channels can yield a new min watermark.
+	 */
+	@Test
+	public void testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		valve.inputWatermark(new Watermark(25), 0);
+		valve.inputWatermark(new Watermark(10), 1);
+		valve.inputWatermark(new Watermark(17), 2);
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		// only channel 0 & 2 is ACTIVE; 17 is the overall min watermark now
+		assertEquals(new Watermark(17), valveOutput.popLastSeenOutput());
 
-		valve.inputWatermark(new Watermark(40), 0);
-		// this acknowledges that channel 1's watermark is being accounted for again
-		assertEquals(new Watermark(38), valveOutput.popLastSeenOutput());
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		// only channel 0 is ACTIVE; 25 is the overall min watermark now
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
 	 * Tests that when all inputs become idle, the max watermark across all channels
 	 * is correctly "flushed" from the valve, as well as the stream status IDLE marker.
+	 *
+	 * <p>This test along with {@link #testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle}
+	 * should completely verify that the eventual watermark advancement result when all inputs become idle
+	 * is independent of the order that the inputs become idle.
 	 */
 	@Test
-	public void testAllInputsBecomeIdleFlushMaxWatermarkAndStreamStatus() {
+	public void testMultipleInputFlushMaxWatermarkAndStreamStatusOnceAllInputsBecomeIdle() {
 		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
 		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
 
@@ -325,11 +313,7 @@ public class StatusWatermarkValveTest {
 		// -------------------------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(10), 0);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(5), 1);
-		assertEquals(null, valveOutput.popLastSeenOutput());
-
 		valve.inputWatermark(new Watermark(3), 2);
 		assertEquals(new Watermark(3), valveOutput.popLastSeenOutput());
 
@@ -349,6 +333,46 @@ public class StatusWatermarkValveTest {
 		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
+	/**
+	 * Tests that when idle channels become active again, they need to "catch up" with
+	 * the latest watermark before they are considered for min watermark computation again.
+	 */
+	@Test
+	public void testMultipleInputWatermarkRealignmentAfterResumeActive() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		valve.inputWatermark(new Watermark(10), 0);
+		valve.inputWatermark(new Watermark(7), 1);
+		valve.inputWatermark(new Watermark(3), 2);
+		assertEquals(new Watermark(3), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(new Watermark(7), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// let channel 2 become active again; since the min watermark has now advanced to 7,
+		// channel 2 should have been marked as non-aligned.
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+		assertFalse(valve.getInputChannelStatus(2).isWatermarkAligned);
+
+		// during the realignment process, watermarks should still be accepted by channel 2 (but shouldn't yield new watermarks)
+		valve.inputWatermark(new Watermark(5), 2);
+		assertEquals(5, valve.getInputChannelStatus(2).watermark);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// let channel 2 catch up with the min watermark; now should be realigned
+		valve.inputWatermark(new Watermark(9), 2);
+		assertTrue(valve.getInputChannelStatus(2).isWatermarkAligned);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// check that realigned inputs is now taken into account for watermark advancement
+		valve.inputWatermark(new Watermark(12), 1);
+		assertEquals(new Watermark(9), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
+
 	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
 		private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<>();
 


[4/4] flink git commit: [FLINK-7721] Verify StatusWatermarkValve only emits WM iff it has aligned inputs

Posted by al...@apache.org.
[FLINK-7721] Verify StatusWatermarkValve only emits WM iff it has aligned inputs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60dec116
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60dec116
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60dec116

Branch: refs/heads/release-1.3
Commit: 60dec1160179d067b022b82dd34042ba3530f410
Parents: c528139
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 2 12:28:10 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 2 16:52:25 2017 +0200

----------------------------------------------------------------------
 .../streamstatus/StatusWatermarkValveTest.java  | 30 ++++++++++++++++++++
 1 file changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60dec116/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index fbf0622..1d972f2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -373,6 +373,36 @@ public class StatusWatermarkValveTest {
 		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
+	/**
+	 * Verify that we don't see any state changes/watermarks when all ACTIVE channels are unaligned.
+	 * Earlier versions of the valve had a bug that would cause it to emit a {@code Long.MAX_VALUE}
+	 * watermark in that case.
+	 */
+	@Test
+	public void testNoOutputWhenAllActiveChannelsAreUnaligned() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		valve.inputWatermark(new Watermark(10), 0);
+		valve.inputWatermark(new Watermark(7), 1);
+
+		// make channel 2 ACTIVE, it is now in "catch up" mode (unaligned watermark)
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(new Watermark(7), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// make channel 2 ACTIVE again, it is still unaligned
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
+		// make channel 0 and 1 IDLE, now channel 2 is the only ACTIVE channel but it's unaligned
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+
+		// we should not see any output
+		assertEquals(null, valveOutput.popLastSeenOutput());
+	}
+
 	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
 		private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<>();