You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/02 14:46:49 UTC

[4/4] flink git commit: [FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in StatusWatermarkValveTest

[FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in StatusWatermarkValveTest

The previous implementation was overly complicated. Having separate
buffers for the StreamStatus and Watermarks is not required for our
tests. Also, that design doesn't allow checking the order StreamStatus /
Watermarks are emitted from a single input to the valve.

This commit reworks it by buffering both StreamStatus and Watermarks in
a shared queue.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6481564c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6481564c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6481564c

Branch: refs/heads/master
Commit: 6481564cb581164a761444922ddfb4ad11a8ef55
Parents: c81a7e5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Sep 28 14:11:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 2 16:42:14 2017 +0200

----------------------------------------------------------------------
 .../streamstatus/StatusWatermarkValveTest.java  | 200 ++++++-------------
 1 file changed, 59 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6481564c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index 12fa56d..4f5e874 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.streamstatus;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
 
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.junit.Test;
 
 import java.util.concurrent.BlockingQueue;
@@ -57,21 +58,13 @@ public class StatusWatermarkValveTest {
 		// ------------------------------------------------------------------------
 
 		valve.inputStreamStatus(StreamStatus.IDLE, 3);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
-
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
-
 		valve.inputStreamStatus(StreamStatus.IDLE, 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
@@ -86,73 +79,57 @@ public class StatusWatermarkValveTest {
 		// start off with an ACTIVE status; since the valve should initially start as ACTIVE,
 		// no state change is toggled, therefore no stream status should be emitted
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// input some monotonously increasing watermarks while ACTIVE;
 		// the exact same watermarks should be emitted right after the inputs
 		valve.inputWatermark(new Watermark(0), 0);
-		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(25), 0);
-		assertEquals(new Watermark(25), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());
 
 		// decreasing watermarks should not result in any output
 		valve.inputWatermark(new Watermark(18), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(42), 0);
-		assertEquals(new Watermark(42), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(42), valveOutput.popLastSeenOutput());
 
 		// toggling ACTIVE to IDLE should result in an IDLE stream status output
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
 		// watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE
 		valve.inputWatermark(new Watermark(52), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(60), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
 
 		// no status change toggle while IDLE should result in stream status outputs
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
+
 
 		// toggling IDLE to ACTIVE should result in an ACTIVE stream status output
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
+
 
 		// the valve should remember the last watermark input channels received while they were ACTIVE (which was 42);
 		// decreasing watermarks should therefore still be ignored, even after a status toggle
 		valve.inputWatermark(new Watermark(40), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// monotonously increasing watermarks after resuming to be ACTIVE should be output normally
 		valve.inputWatermark(new Watermark(68), 0);
-		assertEquals(new Watermark(68), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(68), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(72), 0);
-		assertEquals(new Watermark(72), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(72), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 	}
 
 	/**
@@ -170,17 +147,13 @@ public class StatusWatermarkValveTest {
 		// ------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(0), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(0), 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(0), 2);
-		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that watermarks are output as soon as the overall min
@@ -188,30 +161,24 @@ public class StatusWatermarkValveTest {
 		// ------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(12), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(8), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(10), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(15), 1);
 		// lowest watermark across all channels is now channel 2, with watermark @ 10
-		assertEquals(new Watermark(10), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that decreasing watermarks are ignored
 		// ------------------------------------------------------------------------
 
 		valve.inputWatermark(new Watermark(6), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that when some input channel becomes idle, that channel will
@@ -224,25 +191,18 @@ public class StatusWatermarkValveTest {
 		// also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0),
 		// so the valve should output that
 		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertEquals(new Watermark(12), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(12), valveOutput.popLastSeenOutput());
 
 		// from now on, since channel 2 is IDLE, the valve should use watermarks only from
 		// channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10)
 		valve.inputWatermark(new Watermark(17), 0);
-		assertEquals(new Watermark(15), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(25), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(20), 1);
-		assertEquals(new Watermark(20), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(20), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that after some channel resumes to be ACTIVE, it needs to
@@ -255,45 +215,34 @@ public class StatusWatermarkValveTest {
 		// resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve,
 		// because the valve wasn't overall IDLE, so there should not be any stream status outputs;
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// although watermarks for channel 2 will now be accepted, it still
 		// hasn't caught up with the overall min watermark (20)
 		valve.inputWatermark(new Watermark(18), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks
 		valve.inputWatermark(new Watermark(22), 1);
-		assertEquals(new Watermark(22), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(22), valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(28), 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(33), 1);
-		assertEquals(new Watermark(28), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(28), valveOutput.popLastSeenOutput());
 
 		// now, channel 2 has caught up with the overall min watermark
 		valve.inputWatermark(new Watermark(30), 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(31), 0);
 		// this acknowledges that channel 2's watermark is being accounted for again
-		assertEquals(new Watermark(30), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(30), valveOutput.popLastSeenOutput());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(34), 2);
-		assertEquals(new Watermark(31), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(31), valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that once all channels are IDLE, the valve should also
@@ -303,19 +252,14 @@ public class StatusWatermarkValveTest {
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
 		// this is because once channel 0 becomes IDLE,
 		// the new min watermark will be 33 (channel 1)
-		assertEquals(new Watermark(33), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(33), valveOutput.popLastSeenOutput());
 
 		valve.inputStreamStatus(StreamStatus.IDLE, 2);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// now let all channels become idle; we should only see the idle marker be emitted, and nothing else
 		valve.inputStreamStatus(StreamStatus.IDLE, 1);
-		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
 
 		// ------------------------------------------------------------------------
 		//  Ensure that as channels gradually become ACTIVE again, the above behaviours
@@ -325,86 +269,60 @@ public class StatusWatermarkValveTest {
 
 		// let channel 0 resume to be ACTIVE
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());
 
 		// channel 0 is the only ACTIVE channel now, and is the only channel
 		// accounted for when advancing min watermark
 		valve.inputWatermark(new Watermark(36), 0);
-		assertEquals(new Watermark(36), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(36), valveOutput.popLastSeenOutput());
 
 		// new also let channel 1 become ACTIVE
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// channel 1 is still behind overall min watermark
 		valve.inputWatermark(new Watermark(35), 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// since channel 1 is still behind, channel 0 remains to be the only
 		// channel used to advance min watermark
 		valve.inputWatermark(new Watermark(37), 0);
-		assertEquals(new Watermark(37), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(37), valveOutput.popLastSeenOutput());
 
 		// temporarily let channel 0 (the only active and aligned input) become idle;
 		// this should not result in any watermark or stream status output,
 		// because channel 1 is still active (therefore no stream status toggle) and
 		// at the same time not aligned (therefore should not produce any new min watermarks)
 		valve.inputStreamStatus(StreamStatus.IDLE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		// now, let channel 1 catch up with the overall min watermark
 		valve.inputWatermark(new Watermark(38), 1);
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(null, valveOutput.popLastSeenOutput());
 
 		valve.inputWatermark(new Watermark(40), 0);
 		// this acknowledges that channel 1's watermark is being accounted for again
-		assertEquals(new Watermark(38), valveOutput.popLastOutputWatermark());
-		assertTrue(valveOutput.hasNoOutputWatermarks());
-		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+		assertEquals(new Watermark(38), valveOutput.popLastSeenOutput());
 	}
 
 	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
-		private BlockingQueue<Watermark> outputWatermarks = new LinkedBlockingQueue<>();
-		private BlockingQueue<StreamStatus> outputStreamStatuses = new LinkedBlockingQueue<>();
+		private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<>();
 
 		@Override
 		public void handleWatermark(Watermark watermark) {
-			outputWatermarks.add(watermark);
+			allOutputs.add(watermark);
 		}
 
 		@Override
 		public void handleStreamStatus(StreamStatus streamStatus) {
-			outputStreamStatuses.add(streamStatus);
-		}
-
-		public Watermark popLastOutputWatermark() {
-			return outputWatermarks.poll();
-		}
-
-		public StreamStatus popLastOutputStreamStatus() {
-			return outputStreamStatuses.poll();
-		}
-
-		public boolean hasNoOutputWatermarks() {
-			return outputWatermarks.size() == 0;
+			allOutputs.add(streamStatus);
 		}
 
-		public boolean hasNoOutputStreamStatuses() {
-			return outputStreamStatuses.size() == 0;
+		public StreamElement popLastSeenOutput() {
+			return allOutputs.poll();
 		}
 	}