You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/10 14:50:03 UTC

[1/2] flink git commit: [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources

Repository: flink
Updated Branches:
  refs/heads/master 02410324b -> 66305135b


http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
new file mode 100644
index 0000000..564901f
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StatusWatermarkValve}. While tests in {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest}
+ * and {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest} may also implicitly test {@link StatusWatermarkValve}
+ * and that valves are correctly used in the tasks' input processors, the unit tests here additionally makes sure that
+ * the watermarks and stream statuses to forward are generated from the valve at the exact correct times and in a
+ * deterministic behaviour. The unit tests here also test more complex stream status / watermark input cases.
+ *
+ * <p>
+ * The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call,
+ * the output is checked to contain only the expected watermark or stream status, and nothing else. This ensures that
+ * no redundant outputs are generated by the output logic of {@link StatusWatermarkValve}. The behaviours that a series of
+ * input calls to the valve is trying to test is explained as inline comments within the tests.
+ */
+public class StatusWatermarkValveTest {
+
+	/**
+	 * Tests that all input channels of a valve start as ACTIVE stream status.
+	 */
+	@Test
+	public void testAllInputChannelsStartAsActive() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(4, valveOutput);
+
+		// ------------------------------------------------------------------------
+		//  Ensure that the valve will output an IDLE stream status as soon as
+		//  all input channels become IDLE; this also implicitly ensures that
+		//  all input channels start as ACTIVE.
+		// ------------------------------------------------------------------------
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 3);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+	}
+
+	/**
+	 * Tests that valves work as expected when they handle only 1 input channel.
+	 * Tested behaviours are explained as inline comments.
+	 */
+	@Test
+	public void testOneInputValve() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput);
+
+		// start off with an ACTIVE status; since the valve should initially start as ACTIVE,
+		// no state change is toggled, therefore no stream status should be emitted
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// input some monotonously increasing watermarks while ACTIVE;
+		// the exact same watermarks should be emitted right after the inputs
+		valve.inputWatermark(new Watermark(0), 0);
+		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(25), 0);
+		assertEquals(new Watermark(25), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// decreasing watermarks should not result in any output
+		valve.inputWatermark(new Watermark(18), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(42), 0);
+		assertEquals(new Watermark(42), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// toggling ACTIVE to IDLE should result in an IDLE stream status output
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE
+		valve.inputWatermark(new Watermark(52), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(60), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// no status change toggle while IDLE should result in stream status outputs
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// toggling IDLE to ACTIVE should result in an ACTIVE stream status output
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// the valve should remember the last watermark input channels received while they were ACTIVE (which was 42);
+		// decreasing watermarks should therefore still be ignored, even after a status toggle
+		valve.inputWatermark(new Watermark(40), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// monotonously increasing watermarks after resuming to be ACTIVE should be output normally
+		valve.inputWatermark(new Watermark(68), 0);
+		assertEquals(new Watermark(68), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(72), 0);
+		assertEquals(new Watermark(72), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+	}
+
+	/**
+	 * Tests that valves work as expected when they handle multiple input channels (tested with 3).
+	 * Tested behaviours are explained as inline comments.
+	 */
+	@Test
+	public void testMultipleInputValve() {
+		BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler();
+		StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput);
+
+		// ------------------------------------------------------------------------
+		//  Ensure that watermarks are output only when all
+		//  channels have been input some watermark.
+		// ------------------------------------------------------------------------
+
+		valve.inputWatermark(new Watermark(0), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(0), 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(0), 2);
+		assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that watermarks are output as soon as the overall min
+		//  watermark across all channels have advanced.
+		// ------------------------------------------------------------------------
+
+		valve.inputWatermark(new Watermark(12), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(8), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(10), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(15), 1);
+		// lowest watermark across all channels is now channel 2, with watermark @ 10
+		assertEquals(new Watermark(10), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that decreasing watermarks are ignored
+		// ------------------------------------------------------------------------
+
+		valve.inputWatermark(new Watermark(6), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that when some input channel becomes idle, that channel will
+		//  no longer be accounted for when advancing the watermark.
+		// ------------------------------------------------------------------------
+
+		// marking channel 2 as IDLE shouldn't result in overall status toggle for the valve,
+		// because there are still other active channels (0 and 1), so there should not be any
+		// stream status outputs;
+		// also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0),
+		// so the valve should output that
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertEquals(new Watermark(12), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// from now on, since channel 2 is IDLE, the valve should use watermarks only from
+		// channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10)
+		valve.inputWatermark(new Watermark(17), 0);
+		assertEquals(new Watermark(15), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(25), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(20), 1);
+		assertEquals(new Watermark(20), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that after some channel resumes to be ACTIVE, it needs to
+		//  catch up" with the current overall min watermark before it can be
+		//  accounted for again when finding the min watermark across channels.
+		//  Also tests that before the resumed channel catches up, the overall
+		//  min watermark can still advance with watermarks of other channels.
+		// ------------------------------------------------------------------------
+
+		// resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve,
+		// because the valve wasn't overall IDLE, so there should not be any stream status outputs;
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// although watermarks for channel 2 will now be accepted, it still
+		// hasn't caught up with the overall min watermark (20)
+		valve.inputWatermark(new Watermark(18), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks
+		valve.inputWatermark(new Watermark(22), 1);
+		assertEquals(new Watermark(22), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(28), 0);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(33), 1);
+		assertEquals(new Watermark(28), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// now, channel 2 has caught up with the overall min watermark
+		valve.inputWatermark(new Watermark(30), 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(31), 0);
+		// this acknowledges that channel 2's watermark is being accounted for again
+		assertEquals(new Watermark(30), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(34), 2);
+		assertEquals(new Watermark(31), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that once all channels are IDLE, the valve should also
+		//  determine itself to be IDLE output a IDLE stream status
+		// ------------------------------------------------------------------------
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 0);
+		// this is because once channel 0 becomes IDLE,
+		// the new min watermark will be 33 (channel 1)
+		assertEquals(new Watermark(33), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 2);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputStreamStatus(StreamStatus.IDLE, 1);
+		assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// ------------------------------------------------------------------------
+		//  Ensure that channels gradually become ACTIVE again, the above behaviours
+		//  still hold. Also ensure that as soon as one of the input channels
+		//  become ACTIVE, the valve is ACTIVE again and outputs an ACTIVE stream status.
+		// ------------------------------------------------------------------------
+
+		// let channel 0 resume to be ACTIVE
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+		assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// channel 0 is the only ACTIVE channel now, and is the only channel
+		// accounted for when advancing min watermark
+		valve.inputWatermark(new Watermark(36), 0);
+		assertEquals(new Watermark(36), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// new also let channel 1 become ACTIVE
+		valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// channel 1 is still behind overall min watermark
+		valve.inputWatermark(new Watermark(35), 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// since channel 1 is still behind, channel 0 remains to be the only
+		// channel used to advance min watermark
+		valve.inputWatermark(new Watermark(37), 0);
+		assertEquals(new Watermark(37), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		// now, channel 1 has caught up with the overall min watermark
+		valve.inputWatermark(new Watermark(38), 1);
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+		valve.inputWatermark(new Watermark(40), 0);
+		// this acknowledges that channel 1's watermark is being accounted for again
+		assertEquals(new Watermark(38), valveOutput.popLastOutputWatermark());
+		assertTrue(valveOutput.hasNoOutputWatermarks());
+		assertTrue(valveOutput.hasNoOutputStreamStatuses());
+	}
+
+	private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
+		private BlockingQueue<Watermark> outputWatermarks = new LinkedBlockingQueue<>();
+		private BlockingQueue<StreamStatus> outputStreamStatuses = new LinkedBlockingQueue<>();
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			outputWatermarks.add(watermark);
+		}
+
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			outputStreamStatuses.add(streamStatus);
+		}
+
+		public Watermark popLastOutputWatermark() {
+			return outputWatermarks.poll();
+		}
+
+		public StreamStatus popLastOutputStreamStatus() {
+			return outputStreamStatuses.poll();
+		}
+
+		public boolean hasNoOutputWatermarks() {
+			return outputWatermarks.size() == 0;
+		}
+
+		public boolean hasNoOutputStreamStatuses() {
+			return outputStreamStatuses.size() == 0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
new file mode 100644
index 0000000..247dc8b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class StreamStatusTest {
+
+	@Test (expected = IllegalArgumentException.class)
+	public void testIllegalCreationThrowsException() {
+		new StreamStatus(32);
+	}
+
+	@Test
+	public void testEquals() {
+		StreamStatus idleStatus = new StreamStatus(StreamStatus.IDLE_STATUS);
+		StreamStatus activeStatus = new StreamStatus(StreamStatus.ACTIVE_STATUS);
+
+		assertEquals(StreamStatus.IDLE, idleStatus);
+		assertTrue(idleStatus.isIdle());
+		assertFalse(idleStatus.isActive());
+
+		assertEquals(StreamStatus.ACTIVE, activeStatus);
+		assertTrue(activeStatus.isActive());
+		assertFalse(activeStatus.isIdle());
+	}
+
+	@Test
+	public void testTypeCasting() {
+		StreamStatus status = StreamStatus.ACTIVE;
+
+		assertTrue(status.isStreamStatus());
+		assertFalse(status.isRecord());
+		assertFalse(status.isWatermark());
+		assertFalse(status.isLatencyMarker());
+
+		try {
+			status.asWatermark();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+
+		try {
+			status.asRecord();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+
+		try {
+			status.asLatencyMarker();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index be93f6a..4b08c83 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -38,15 +39,18 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -59,14 +63,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -98,6 +95,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 	public void testOpenCloseAndTimestamps() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
@@ -126,15 +124,20 @@ public class OneInputStreamTaskTest extends TestLogger {
 	}
 
 	/**
-	 * This test verifies that watermarks are correctly forwarded. This also checks whether
+	 * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether
 	 * watermarks are forwarded only when we have received watermarks from all inputs. The
-	 * forwarded watermark must be the minimum of the watermarks of all inputs.
+	 * forwarded watermark must be the minimum of the watermarks of all active inputs.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testWatermarkForwarding() throws Exception {
+	public void testWatermarkAndStreamStatusForwarding() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		final OneInputStreamTaskTestHarness<String, String> testHarness =
+			new OneInputStreamTaskTestHarness<String, String>(
+				mapTask, 2, 2,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
@@ -180,8 +183,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		expectedOutput.add(new Watermark(initialTime + 2));
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
-
-		// advance watermark from one of the inputs, now we should get a now one since the
+		// advance watermark from one of the inputs, now we should get a new one since the
 		// minimum increases
 		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
 		testHarness.waitForInputProcessing();
@@ -196,6 +198,31 @@ public class OneInputStreamTaskTest extends TestLogger {
 		expectedOutput.add(new Watermark(initialTime + 4));
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
+		// test whether idle input channels are acknowledged correctly when forwarding watermarks
+		testHarness.processElement(StreamStatus.IDLE, 0, 1);
+		testHarness.processElement(StreamStatus.IDLE, 1, 0);
+		testHarness.processElement(new Watermark(initialTime + 6), 0, 0);
+		testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first
+		testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged,
+		                                                     // watermark (initial + 6) should be forwarded
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 5));
+		expectedOutput.add(new Watermark(initialTime + 6));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make all input channels idle and check that the operator's idle status is forwarded
+		testHarness.processElement(StreamStatus.IDLE, 0, 0);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.IDLE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make some input channels active again and check that the operator's active status is forwarded only once
+		testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
+		testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.ACTIVE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
 		testHarness.endInput();
 
 		testHarness.waitForTaskCompletion();
@@ -205,12 +232,170 @@ public class OneInputStreamTaskTest extends TestLogger {
 	}
 
 	/**
+	 * This test verifies that watermarks are not forwarded when the task is idle.
+	 * It also verifies that when task is idle, watermarks generated in the middle of chains are also blocked and
+	 * never forwarded.
+	 *
+	 * The tested chain will be: (HEAD: normal operator) --> (watermark generating operator) --> (normal operator).
+	 * The operators will throw an exception and fail the test if either of them were forwarded watermarks when
+	 * the task is idle.
+	 */
+	@Test
+	public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception {
+		final OneInputStreamTask<String, String> testTask = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness =
+			new OneInputStreamTaskTestHarness<String, String>(
+				testTask, 1, 1,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		// ------------------ setup the chain ------------------
+
+		TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator();
+		StreamConfig headOperatorConfig = testHarness.getStreamConfig();
+
+		WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator();
+		StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration());
+
+		TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator();
+		StreamConfig tailOperatorConfig = new StreamConfig(new Configuration());
+
+		headOperatorConfig.setStreamOperator(headOperator);
+		headOperatorConfig.setChainStart();
+		headOperatorConfig.setChainIndex(0);
+		headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
+			new StreamNode(null, 0, null, null, null, null, null),
+			new StreamNode(null, 1, null, null, null, null, null),
+			0,
+			Collections.<String>emptyList(),
+			null
+		)));
+
+		watermarkOperatorConfig.setStreamOperator(watermarkOperator);
+		watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
+		watermarkOperatorConfig.setChainIndex(1);
+		watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
+			new StreamNode(null, 1, null, null, null, null, null),
+			new StreamNode(null, 2, null, null, null, null, null),
+			0,
+			Collections.<String>emptyList(),
+			null
+		)));
+
+		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+		outEdgesInOrder.add(new StreamEdge(
+			new StreamNode(null, 2, null, null, null, null, null),
+			new StreamNode(null, 3, null, null, null, null, null),
+			0,
+			Collections.<String>emptyList(),
+			new BroadcastPartitioner<Object>()));
+
+		tailOperatorConfig.setStreamOperator(tailOperator);
+		tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
+		tailOperatorConfig.setBufferTimeout(0);
+		tailOperatorConfig.setChainIndex(2);
+		tailOperatorConfig.setChainEnd();
+		tailOperatorConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
+		tailOperatorConfig.setNumberOfOutputs(1);
+		tailOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
+		tailOperatorConfig.setNonChainedOutputs(outEdgesInOrder);
+		tailOperatorConfig.setTypeSerializerOut(StringSerializer.INSTANCE);
+
+		Map<Integer, StreamConfig> chainedConfigs = new HashMap<>(2);
+		chainedConfigs.put(1, watermarkOperatorConfig);
+		chainedConfigs.put(2, tailOperatorConfig);
+		headOperatorConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
+		headOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
+
+		// -----------------------------------------------------
+
+		// --------------------- begin test ---------------------
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		// the task starts as active, so all generated watermarks should be forwarded
+		testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+		testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+
+		// this watermark will be forwarded since the task is currently active,
+		// but should not be in the final output because it should be blocked by the watermark generator in the chain
+		testHarness.processElement(new Watermark(15));
+
+		testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+		testHarness.waitForInputProcessing();
+
+		expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+		expectedOutput.add(new StreamRecord<>("10"));
+		expectedOutput.add(new Watermark(10));
+		expectedOutput.add(new StreamRecord<>("20"));
+		expectedOutput.add(new Watermark(20));
+		expectedOutput.add(new StreamRecord<>("30"));
+		expectedOutput.add(new Watermark(30));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// now, toggle the task to be idle, and let the watermark generator produce some watermarks
+		testHarness.processElement(StreamStatus.IDLE);
+
+		// after this, the operators will throw an exception if they are forwarded watermarks anywhere in the chain
+		testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+
+		// NOTE: normally, tasks will not have records to process while idle;
+		// we're doing this here only to mimic watermark generating in operators
+		testHarness.processElement(new StreamRecord<>("40"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("50"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("60"), 0, 0);
+		testHarness.processElement(new Watermark(65)); // the test will fail if any of the operators were forwarded this
+		testHarness.waitForInputProcessing();
+
+		// the 40 - 60 watermarks should not be forwarded, only the stream status toggle element and records
+		expectedOutput.add(StreamStatus.IDLE);
+		expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+		expectedOutput.add(new StreamRecord<>("40"));
+		expectedOutput.add(new StreamRecord<>("50"));
+		expectedOutput.add(new StreamRecord<>("60"));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// re-toggle the task to be active and see if new watermarks are correctly forwarded again
+		testHarness.processElement(StreamStatus.ACTIVE);
+		testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+		testHarness.processElement(new StreamRecord<>("70"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("80"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("90"), 0, 0);
+		testHarness.waitForInputProcessing();
+
+		expectedOutput.add(StreamStatus.ACTIVE);
+		expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+		expectedOutput.add(new StreamRecord<>("70"));
+		expectedOutput.add(new Watermark(70));
+		expectedOutput.add(new StreamRecord<>("80"));
+		expectedOutput.add(new Watermark(80));
+		expectedOutput.add(new StreamRecord<>("90"));
+		expectedOutput.add(new Watermark(90));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+		assertEquals(12, resultElements.size());
+	}
+
+	/**
 	 * This test verifies that checkpoint barriers are correctly forwarded.
 	 */
 	@Test
 	public void testCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
@@ -269,6 +454,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
@@ -339,6 +525,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 		final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 		final OneInputStreamTask<String, String> streamTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(streamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
+
 		IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
 		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -658,5 +846,80 @@ public class OneInputStreamTaskTest extends TestLogger {
 			return value;
 		}
 	}
+
+	/**
+	 * A {@link TriggerableFailOnWatermarkTestOperator} that generates watermarks.
+	 */
+	private static class WatermarkGeneratingTestOperator extends TriggerableFailOnWatermarkTestOperator {
+
+		private static final long serialVersionUID = -5064871833244157221L;
+
+		private long lastWatermark;
+
+		@Override
+		protected void handleElement(StreamRecord<String> element) {
+			long timestamp = Long.valueOf(element.getValue());
+			if (timestamp > lastWatermark) {
+				output.emitWatermark(new Watermark(timestamp));
+				lastWatermark = timestamp;
+			}
+		}
+
+		@Override
+		protected void handleWatermark(Watermark mark) {
+			if (mark.equals(Watermark.MAX_WATERMARK)) {
+				output.emitWatermark(mark);
+				lastWatermark = Long.MAX_VALUE;
+			}
+		}
+	}
+
+	/**
+	 * An operator that can be triggered whether or not to expect watermarks forwarded to it, toggled
+	 * by letting it process special trigger marker records.
+	 *
+	 * If it receives a watermark when it's not expecting one, it'll throw an exception and fail.
+	 */
+	private static class TriggerableFailOnWatermarkTestOperator
+			extends AbstractStreamOperator<String>
+			implements OneInputStreamOperator<String, String> {
+
+		private static final long serialVersionUID = 2048954179291813243L;
+
+		public final static String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS";
+		public final static String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS";
+
+		protected boolean expectForwardedWatermarks;
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			output.collect(element);
+
+			if (element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
+				this.expectForwardedWatermarks = true;
+			} else if (element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) {
+				this.expectForwardedWatermarks = false;
+			} else {
+				handleElement(element);
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			if (!expectForwardedWatermarks) {
+				throw new Exception("Received a " + mark + ", but this operator should not be forwarded watermarks.");
+			} else {
+				handleWatermark(mark);
+			}
+		}
+
+		protected void handleElement(StreamRecord<String> element) {
+			// do nothing
+		}
+
+		protected void handleWatermark(Watermark mark) {
+			output.emitWatermark(mark);
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index dd1fe58..0773699 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -57,6 +57,7 @@ public class SourceStreamTaskTest {
 	public void testOpenClose() throws Exception {
 		final SourceStreamTask<String, SourceFunction<String>, StreamSource<String, SourceFunction<String>>> sourceTask = new SourceStreamTask<>();
 		final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamSource<String, ?> sourceOperator = new StreamSource<>(new OpenCloseTestSource());
@@ -99,6 +100,7 @@ public class SourceStreamTaskTest {
 			final SourceStreamTask<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>,
 				StreamSource<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>>> sourceTask = new SourceStreamTask<>();
 			final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
+			testHarness.setupOutputForSingletonOperatorChain();
 
 			StreamConfig streamConfig = testHarness.getStreamConfig();
 			StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index ebe5285..c2d4aaa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -50,6 +50,7 @@ public class StreamTaskCancellationBarrierTest {
 	public void testEmitCancellationBarrierWhenNotReady() throws Exception {
 		StreamTask<String, ?> task = new InitBlockingTask();
 		StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		// start the test - this cannot succeed across the 'init()' method
 		testHarness.invoke();
@@ -80,6 +81,7 @@ public class StreamTaskCancellationBarrierTest {
 				task,
 				1, 2,
 				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
@@ -124,6 +126,7 @@ public class StreamTaskCancellationBarrierTest {
 		TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
 				task,
 				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 8dc6afa..e58bc5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -106,9 +106,6 @@ public class StreamTaskTestHarness<OUT> {
 		this.executionConfig = new ExecutionConfig();
 
 		streamConfig = new StreamConfig(taskConfig);
-		streamConfig.setChainStart();
-		streamConfig.setBufferTimeout(0);
-		streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		outputSerializer = outputType.createSerializer(executionConfig);
 		outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer);
@@ -129,11 +126,25 @@ public class StreamTaskTestHarness<OUT> {
 	@SuppressWarnings("unchecked")
 	private void initializeOutput() {
 		outputList = new ConcurrentLinkedQueue<Object>();
-
 		mockEnv.addOutput(outputList, outputStreamRecordSerializer);
+	}
 
+	/**
+	 * Users of the test harness can call this utility method to setup the stream config
+	 * if there will only be a single operator to be tested. The method will setup the
+	 * outgoing network connection for the operator.
+	 *
+	 * For more advanced test cases such as testing chains of multiple operators with the harness,
+	 * please manually configure the stream config.
+	 */
+	public void setupOutputForSingletonOperatorChain() {
+		streamConfig.setChainStart();
+		streamConfig.setBufferTimeout(0);
+		streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 		streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
 		streamConfig.setNumberOfOutputs(1);
+		streamConfig.setTypeSerializerOut(outputSerializer);
+		streamConfig.setVertexID(0);
 
 		StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
 			private static final long serialVersionUID = 1L;
@@ -142,13 +153,10 @@ public class StreamTaskTestHarness<OUT> {
 		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
 		StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 		StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-
 		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
+
 		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
 		streamConfig.setNonChainedOutputs(outEdgesInOrder);
-		streamConfig.setTypeSerializerOut(outputSerializer);
-		streamConfig.setVertexID(0);
-
 	}
 
 	public StreamMockEnvironment createEnvironment() {
@@ -330,9 +338,6 @@ public class StreamTaskTestHarness<OUT> {
 					allEmpty = false;
 				}
 			}
-			try {
-				Thread.sleep(10);
-			} catch (InterruptedException ignored) {}
 
 			if (allEmpty) {
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 3cd9c9a..c0a1638 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 
 import org.junit.Assert;
@@ -58,6 +59,7 @@ public class TwoInputStreamTaskTest {
 	public void testOpenCloseAndTimestamps() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
 		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
@@ -89,15 +91,21 @@ public class TwoInputStreamTaskTest {
 	}
 
 	/**
-	 * This test verifies that watermarks are correctly forwarded. This also checks whether
+	 * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether
 	 * watermarks are forwarded only when we have received watermarks from all inputs. The
-	 * forwarded watermark must be the minimum of the watermarks of all inputs.
+	 * forwarded watermark must be the minimum of the watermarks of all active inputs.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testWatermarkForwarding() throws Exception {
+	public void testWatermarkAndStreamStatusForwarding() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
-		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
+			new TwoInputStreamTaskTestHarness<String, Integer, String>(
+				coMapTask, 2, 2, new int[] {1, 2},
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
@@ -147,7 +155,7 @@ public class TwoInputStreamTaskTest {
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
 
-		// advance watermark from one of the inputs, now we should get a now one since the
+		// advance watermark from one of the inputs, now we should get a new one since the
 		// minimum increases
 		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
 		testHarness.waitForInputProcessing();
@@ -162,6 +170,33 @@ public class TwoInputStreamTaskTest {
 		expectedOutput.add(new Watermark(initialTime + 4));
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
+		// test whether idle input channels are acknowledged correctly when forwarding watermarks
+		testHarness.processElement(StreamStatus.IDLE, 0, 1);
+		testHarness.processElement(StreamStatus.IDLE, 1, 0);
+		testHarness.processElement(new Watermark(initialTime + 6), 0, 0);
+		testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first
+		testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged,
+		                                                     // watermark (initial + 6) should be forwarded
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 5));
+		// We don't expect to see Watermark(6) here because the idle status of one
+		// input doesn't propagate to the other input. That is, if input 1 is at WM 6 and input
+		// two was at WM 5 before going to IDLE then the output watermark will not jump to WM 6.
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make all input channels idle and check that the operator's idle status is forwarded
+		testHarness.processElement(StreamStatus.IDLE, 0, 0);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.IDLE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// make some input channels active again and check that the operator's active status is forwarded only once
+		testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
+		testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(StreamStatus.ACTIVE);
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
 		testHarness.endInput();
 
 		testHarness.waitForTaskCompletion();
@@ -178,6 +213,7 @@ public class TwoInputStreamTaskTest {
 	public void testCheckpointBarriers() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
 		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
@@ -258,6 +294,7 @@ public class TwoInputStreamTaskTest {
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
 		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());


[2/2] flink git commit: [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources

Posted by al...@apache.org.
[FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources

This commit is the first part of making idle streaming sources in Flink
possible. It introduces a new element, StreamStatus, that flows with
other records in streams. StreamStatus elements are generated at the
sources, and affect how operators advance their watermarks with the
presence of idle sources.

Prior to this commit, when advancing watermarks at downstream operators,
the new min watermark is found by simply determining if the min
watermark across all input channels has advanced. This resulted in
watermark-stalling downstream operators when there are idle sources.
With this change, operators can now mark input channels to be idle, and
ignore them when advancing their watermark.

This commit also includes refactoring of previous watermark forwarding
logic into a single class, StatusWatermarkValve. OneInputStreamTasks,
TwoInputStreamTasks, and AbstractStreamOperator use valves to help them
determine how watermarks and stream statuses are forwarded.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66305135
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66305135
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66305135

Branch: refs/heads/master
Commit: 66305135bcfe0841fdc9d26fdc0d8f373fa58b62
Parents: 0241032
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Nov 14 10:53:18 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 10 14:20:42 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |   2 +
 .../runtime/io/RecordWriterOutput.java          |  24 +-
 .../runtime/io/StreamInputProcessor.java        | 124 ++++--
 .../runtime/io/StreamTwoInputProcessor.java     | 236 +++++++----
 ...TimestampsAndPeriodicWatermarksOperator.java |   5 +
 ...mestampsAndPunctuatedWatermarksOperator.java |   5 +
 .../runtime/streamrecord/StreamElement.java     |  18 +
 .../streamrecord/StreamElementSerializer.java   |  18 +-
 .../streamstatus/StatusWatermarkValve.java      | 199 ++++++++++
 .../runtime/streamstatus/StreamStatus.java      | 128 ++++++
 .../streamstatus/StreamStatusProvider.java      |  34 ++
 .../runtime/tasks/OneInputStreamTask.java       |  27 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  88 +++-
 .../runtime/tasks/StreamIterationTail.java      |  12 +-
 .../streaming/runtime/tasks/StreamTask.java     |   3 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  21 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   3 +
 .../api/streamtask/StreamIterationHeadTest.java |   1 +
 .../runtime/operators/StreamTaskTimerTest.java  |   2 +
 .../TestProcessingTimeServiceTest.java          |   1 +
 ...stampsAndPeriodicWatermarksOperatorTest.java |   2 -
 .../streamstatus/StatusWatermarkValveTest.java  | 398 +++++++++++++++++++
 .../runtime/streamstatus/StreamStatusTest.java  |  80 ++++
 .../runtime/tasks/OneInputStreamTaskTest.java   | 293 +++++++++++++-
 .../runtime/tasks/SourceStreamTaskTest.java     |   2 +
 .../StreamTaskCancellationBarrierTest.java      |   3 +
 .../runtime/tasks/StreamTaskTestHarness.java    |  27 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |  47 ++-
 28 files changed, 1617 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 46a184a..6587291 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -119,6 +119,7 @@ public class RocksDBAsyncSnapshotTest {
 		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
 
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
 			@Override
@@ -219,6 +220,7 @@ public class RocksDBAsyncSnapshotTest {
 		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
 
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 2625031..51c6cd7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -27,9 +27,11 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -43,11 +45,13 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	
 	private SerializationDelegate<StreamElement> serializationDelegate;
 
+	private final StreamStatusProvider streamStatusProvider;
 	
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
 			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
-			TypeSerializer<OUT> outSerializer) {
+			TypeSerializer<OUT> outSerializer,
+			StreamStatusProvider streamStatusProvider) {
 
 		checkNotNull(recordWriter);
 		
@@ -62,6 +66,8 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 		if (outSerializer != null) {
 			serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
 		}
+
+		this.streamStatusProvider = checkNotNull(streamStatusProvider);
 	}
 
 	@Override
@@ -79,7 +85,19 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	@Override
 	public void emitWatermark(Watermark mark) {
 		serializationDelegate.setInstance(mark);
-		
+
+		if (streamStatusProvider.getStreamStatus().isActive()) {
+			try {
+				recordWriter.broadcastEmit(serializationDelegate);
+			} catch (Exception e) {
+				throw new RuntimeException(e.getMessage(), e);
+			}
+		}
+	}
+
+	public void emitStreamStatus(StreamStatus streamStatus) {
+		serializationDelegate.setInstance(streamStatus);
+
 		try {
 			recordWriter.broadcastEmit(serializationDelegate);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index b3257a5..e2061c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -43,20 +43,26 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
  *
  * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
+ * This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and {@link StreamStatus} events,
+ * and forwards them to event subscribers once the {@link StatusWatermarkValve} determines the {@link Watermark} from
+ * all inputs has advanced, or that a {@link StreamStatus} needs to be propagated downstream to denote a status change.
  *
  * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
+ * Forwarding elements, watermarks, or status status elements must be protected by synchronizing on the given lock
  * object. This ensures that we don't call methods on a {@link OneInputStreamOperator} concurrently
  * with the timer callback or other things.
  * 
@@ -69,29 +75,48 @@ public class StreamInputProcessor<IN> {
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
 
+	private final DeserializationDelegate<StreamElement> deserializationDelegate;
+
 	private final CheckpointBarrierHandler barrierHandler;
 
-	// We need to keep track of the channel from which a buffer came, so that we can
-	// appropriately map the watermarks to input channels
-	private int currentChannel = -1;
+	private final Object lock;
 
-	private boolean isFinished;
+	// ---------------- Status and Watermark Valve ------------------
 
-	private final long[] watermarks;
-	private long lastEmittedWatermark;
+	/** Valve that controls how watermarks and stream statuses are forwarded. */
+	private StatusWatermarkValve statusWatermarkValve;
 
-	private final DeserializationDelegate<StreamElement> deserializationDelegate;
+	/** Number of input channels the valve needs to handle. */
+	private final int numInputChannels;
 
+	/**
+	 * The channel from which a buffer came, tracked so that we can appropriately map
+	 * the watermarks and watermark statuses to channel indexes of the valve.
+	 */
+	private int currentChannel = -1;
+
+	private final OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain;
+	
+	private final OneInputStreamOperator<IN, ?> streamOperator;
+
+	// ---------------- Metrics ------------------
+
+	private long lastEmittedWatermark;
 	private Counter numRecordsIn;
 
+	private boolean isFinished;
+
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(
 			InputGate[] inputGates,
 			TypeSerializer<IN> inputSerializer,
 			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
+			Object lock,
 			IOManager ioManager,
-			Configuration taskManagerConfig) throws IOException {
+			Configuration taskManagerConfig,
+			OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain,
+			OneInputStreamOperator<IN, ?> streamOperator) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -114,6 +139,8 @@ public class StreamInputProcessor<IN> {
 		if (checkpointedTask != null) {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
+
+		this.lock = checkNotNull(lock);
 		
 		StreamElementSerializer<IN> ser = new StreamElementSerializer<>(inputSerializer);
 		this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser);
@@ -126,15 +153,19 @@ public class StreamInputProcessor<IN> {
 					ioManager.getSpillingDirectoriesPaths());
 		}
 
-		watermarks = new long[inputGate.getNumberOfInputChannels()];
-		for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
-			watermarks[i] = Long.MIN_VALUE;
-		}
-		lastEmittedWatermark = Long.MIN_VALUE;
+		this.numInputChannels = inputGate.getNumberOfInputChannels();
+
+		this.lastEmittedWatermark = Long.MIN_VALUE;
+
+		this.operatorChain = checkNotNull(operatorChain);
+		this.streamOperator = checkNotNull(streamOperator);
+
+		this.statusWatermarkValve = new StatusWatermarkValve(
+				numInputChannels,
+				new ForwardingValveOutputHandler(streamOperator, lock));
 	}
 
-	@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
+	public boolean processInput() throws Exception {
 		if (isFinished) {
 			return false;
 		}
@@ -155,22 +186,14 @@ public class StreamInputProcessor<IN> {
 					StreamElement recordOrMark = deserializationDelegate.getInstance();
 
 					if (recordOrMark.isWatermark()) {
-						long watermarkMillis = recordOrMark.asWatermark().getTimestamp();
-						if (watermarkMillis > watermarks[currentChannel]) {
-							watermarks[currentChannel] = watermarkMillis;
-							long newMinWatermark = Long.MAX_VALUE;
-							for (long watermark: watermarks) {
-								newMinWatermark = Math.min(watermark, newMinWatermark);
-							}
-							if (newMinWatermark > lastEmittedWatermark) {
-								lastEmittedWatermark = newMinWatermark;
-								synchronized (lock) {
-									streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
-								}
-							}
-						}
+						// handle watermark
+						statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
+						continue;
+					} else if (recordOrMark.isStreamStatus()) {
+						// handle stream status
+						statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
 						continue;
-					} else if(recordOrMark.isLatencyMarker()) {
+					} else if (recordOrMark.isLatencyMarker()) {
 						// handle latency marker
 						synchronized (lock) {
 							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
@@ -247,4 +270,39 @@ public class StreamInputProcessor<IN> {
 		// cleanup the barrier handler resources
 		barrierHandler.cleanup();
 	}
+
+	private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
+		private final OneInputStreamOperator<IN, ?> operator;
+		private final Object lock;
+
+		private ForwardingValveOutputHandler(final OneInputStreamOperator<IN, ?> operator, final Object lock) {
+			this.operator = checkNotNull(operator);
+			this.lock = checkNotNull(lock);
+		}
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			try {
+				synchronized (lock) {
+					lastEmittedWatermark = watermark.getTimestamp();
+					operator.processWatermark(watermark);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			try {
+				synchronized (lock) {
+					operatorChain.setStreamStatus(streamStatus);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e5aeec1..a295395 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -39,23 +39,29 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}.
  *
  * <p>
- * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers
- * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances.
+ * This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and {@link StreamStatus} events,
+ * and forwards watermarks to event subscribers once the {@link StatusWatermarkValve} determines the watermarks from
+ * all inputs has advanced, or changes the task's {@link StreamStatus} once status change is toggled.
  *
  * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
+ * Forwarding elements, watermarks, or status status elements must be protected by synchronizing on the given lock
  * object. This ensures that we don't call methods on a {@link TwoInputStreamOperator} concurrently
  * with the timer callback or other things.
  *
@@ -69,26 +75,50 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
 
-	// We need to keep track of the channel from which a buffer came, so that we can
-	// appropriately map the watermarks to input channels
-	private int currentChannel = -1;
-
-	private boolean isFinished;
+	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
+	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
 
 	private final CheckpointBarrierHandler barrierHandler;
 
-	private final long[] watermarks1;
-	private long lastEmittedWatermark1;
+	private final Object lock;
 
-	private final long[] watermarks2;
-	private long lastEmittedWatermark2;
+	// ---------------- Status and Watermark Valves ------------------
 
+	/**
+	 * Stream status for the two inputs. We need to keep track for determining when
+	 * to forward stream status changes downstream.
+	 */
+	private StreamStatus firstStatus;
+	private StreamStatus secondStatus;
+
+	/**
+	 * Valves that control how watermarks and stream statuses from the 2 inputs are forwarded.
+	 */
+	private StatusWatermarkValve statusWatermarkValve1;
+	private StatusWatermarkValve statusWatermarkValve2;
+
+	/** Number of input channels the valves need to handle. */
 	private final int numInputChannels1;
+	private final int numInputChannels2;
 
-	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
-	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
+	/**
+	 * The channel from which a buffer came, tracked so that we can appropriately map
+	 * the watermarks and watermark statuses to the correct channel index of the correct valve.
+	 */
+	private int currentChannel = -1;
 
-	@SuppressWarnings({"unchecked", "rawtypes"})
+	private final OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain;
+
+	private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
+
+	// ---------------- Metrics ------------------
+
+	private long lastEmittedWatermark1;
+	private long lastEmittedWatermark2;
+
+	private boolean isFinished;
+
+	@SuppressWarnings("unchecked")
 	public StreamTwoInputProcessor(
 			Collection<InputGate> inputGates1,
 			Collection<InputGate> inputGates2,
@@ -96,8 +126,11 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			TypeSerializer<IN2> inputSerializer2,
 			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
+			Object lock,
 			IOManager ioManager,
-			Configuration taskManagerConfig) throws IOException {
+			Configuration taskManagerConfig,
+			OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain,
+			TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws IOException {
 
 		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
 
@@ -120,6 +153,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		if (checkpointedTask != null) {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
+
+		this.lock = checkNotNull(lock);
 		
 		StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1);
 		this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1);
@@ -142,19 +177,23 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		}
 		
 		this.numInputChannels1 = numInputChannels1;
-		int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+		this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
 
-		watermarks1 = new long[numInputChannels1];
-		Arrays.fill(watermarks1, Long.MIN_VALUE);
-		lastEmittedWatermark1 = Long.MIN_VALUE;
+		this.lastEmittedWatermark1 = Long.MIN_VALUE;
+		this.lastEmittedWatermark2 = Long.MIN_VALUE;
+
+		this.firstStatus = StreamStatus.ACTIVE;
+		this.secondStatus = StreamStatus.ACTIVE;
+
+		this.operatorChain = checkNotNull(operatorChain);
+		this.streamOperator = checkNotNull(streamOperator);
+
+		this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
+		this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock));
 
-		watermarks2 = new long[numInputChannels2];
-		Arrays.fill(watermarks2, Long.MIN_VALUE);
-		lastEmittedWatermark2 = Long.MIN_VALUE;
 	}
 
-	@SuppressWarnings("unchecked")
-	public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Object lock) throws Exception {
+	public boolean processInput() throws Exception {
 		if (isFinished) {
 			return false;
 		}
@@ -177,7 +216,11 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					if (currentChannel < numInputChannels1) {
 						StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
 						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
+							statusWatermarkValve1.inputWatermark(recordOrWatermark.asWatermark(), currentChannel);
+							continue;
+						}
+						else if (recordOrWatermark.isStreamStatus()) {
+							statusWatermarkValve1.inputStreamStatus(recordOrWatermark.asStreamStatus(), currentChannel);
 							continue;
 						}
 						else if (recordOrWatermark.isLatencyMarker()) {
@@ -187,9 +230,10 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 							continue;
 						}
 						else {
+							StreamRecord<IN1> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
-								streamOperator.setKeyContextElement1(recordOrWatermark.<IN1>asRecord());
-								streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
+								streamOperator.setKeyContextElement1(record);
+								streamOperator.processElement1(record);
 							}
 							return true;
 
@@ -198,7 +242,11 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					else {
 						StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
 						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
+							statusWatermarkValve2.inputWatermark(recordOrWatermark.asWatermark(), currentChannel - numInputChannels1);
+							continue;
+						}
+						else if (recordOrWatermark.isStreamStatus()) {
+							statusWatermarkValve2.inputStreamStatus(recordOrWatermark.asStreamStatus(), currentChannel - numInputChannels1);
 							continue;
 						}
 						else if (recordOrWatermark.isLatencyMarker()) {
@@ -208,9 +256,10 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 							continue;
 						}
 						else {
+							StreamRecord<IN2> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
-								streamOperator.setKeyContextElement2(recordOrWatermark.<IN2>asRecord());
-								streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
+								streamOperator.setKeyContextElement2(record);
+								streamOperator.processElement2(record);
 							}
 							return true;
 						}
@@ -244,41 +293,6 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		}
 	}
 
-	private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex, Object lock) throws Exception {
-		if (channelIndex < numInputChannels1) {
-			long watermarkMillis = mark.getTimestamp();
-			if (watermarkMillis > watermarks1[channelIndex]) {
-				watermarks1[channelIndex] = watermarkMillis;
-				long newMinWatermark = Long.MAX_VALUE;
-				for (long wm : watermarks1) {
-					newMinWatermark = Math.min(wm, newMinWatermark);
-				}
-				if (newMinWatermark > lastEmittedWatermark1) {
-					lastEmittedWatermark1 = newMinWatermark;
-					synchronized (lock) {
-						operator.processWatermark1(new Watermark(lastEmittedWatermark1));
-					}
-				}
-			}
-		} else {
-			channelIndex = channelIndex - numInputChannels1;
-			long watermarkMillis = mark.getTimestamp();
-			if (watermarkMillis > watermarks2[channelIndex]) {
-				watermarks2[channelIndex] = watermarkMillis;
-				long newMinWatermark = Long.MAX_VALUE;
-				for (long wm : watermarks2) {
-					newMinWatermark = Math.min(wm, newMinWatermark);
-				}
-				if (newMinWatermark > lastEmittedWatermark2) {
-					lastEmittedWatermark2 = newMinWatermark;
-					synchronized (lock) {
-						operator.processWatermark2(new Watermark(lastEmittedWatermark2));
-					}
-				}
-			}
-		}
-	}
-
 	/**
 	 * Sets the metric group for this StreamTwoInputProcessor.
 	 *
@@ -312,4 +326,92 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		// cleanup the barrier handler resources
 		barrierHandler.cleanup();
 	}
+
+	private class ForwardingValveOutputHandler1 implements StatusWatermarkValve.ValveOutputHandler {
+		private final TwoInputStreamOperator<IN1, IN2, ?> operator;
+		private final Object lock;
+
+		private ForwardingValveOutputHandler1(final TwoInputStreamOperator<IN1, IN2, ?> operator, final Object lock) {
+			this.operator = checkNotNull(operator);
+			this.lock = checkNotNull(lock);
+		}
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			try {
+				synchronized (lock) {
+					lastEmittedWatermark1 = watermark.getTimestamp();
+					operator.processWatermark1(watermark);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
+			}
+		}
+
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			try {
+				synchronized (lock) {
+					firstStatus = streamStatus;
+
+					// check if we need to toggle the task's stream status
+					if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+						if (streamStatus.isActive()) {
+							// we're no longer idle if at least one input has become active
+							operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+						} else if (secondStatus.isIdle()) {
+							// we're idle once both inputs are idle
+							operatorChain.setStreamStatus(StreamStatus.IDLE);
+						}
+					}
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
+			}
+		}
+	}
+
+	private class ForwardingValveOutputHandler2 implements StatusWatermarkValve.ValveOutputHandler {
+		private final TwoInputStreamOperator<IN1, IN2, ?> operator;
+		private final Object lock;
+
+		private ForwardingValveOutputHandler2(final TwoInputStreamOperator<IN1, IN2, ?> operator, final Object lock) {
+			this.operator = checkNotNull(operator);
+			this.lock = checkNotNull(lock);
+		}
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			try {
+				synchronized (lock) {
+					lastEmittedWatermark2 = watermark.getTimestamp();
+					operator.processWatermark2(watermark);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
+			}
+		}
+
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			try {
+				synchronized (lock) {
+					secondStatus = streamStatus;
+
+					// check if we need to toggle the task's stream status
+					if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+						if (streamStatus.isActive()) {
+							// we're no longer idle if at least one input has become active
+							operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+						} else if (firstStatus.isIdle()) {
+							// we're idle once both inputs are idle
+							operatorChain.setStreamStatus(StreamStatus.IDLE);
+						}
+					}
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index 4defb96..0863580 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -82,6 +82,11 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
 	}
 
+	/**
+	 * Override the base implementation to completely ignore watermarks propagated from
+	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
+	 * watermarks from here).
+	 */
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		// if we receive a Long.MAX_VALUE watermark we forward it since it is used

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
index ac85b8a..3fc9f9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
@@ -59,6 +59,11 @@ public class TimestampsAndPunctuatedWatermarksOperator<T>
 		}
 	}
 
+	/**
+	 * Override the base implementation to completely ignore watermarks propagated from
+	 * upstream (we rely only on the {@link AssignerWithPunctuatedWatermarks} to emit
+	 * watermarks from here).
+	 */
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		// if we receive a Long.MAX_VALUE watermark we forward it since it is used

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
index 62418bc..643e240 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
 /**
  * An element in a data stream. Can be a record or a Watermark.
@@ -36,6 +37,14 @@ public abstract class StreamElement {
 	}
 
 	/**
+	 * Checks whether this element is a stream status.
+	 * @return True, if this element is a stream status, false otherwise.
+	 */
+	public final boolean isStreamStatus() {
+		return getClass() == StreamStatus.class;
+	}
+
+	/**
 	 * Checks whether this element is a record.
 	 * @return True, if this element is a record, false otherwise.
 	 */
@@ -71,6 +80,15 @@ public abstract class StreamElement {
 	}
 
 	/**
+	 * Casts this element into a StreamStatus.
+	 * @return This element as a StreamStatus.
+	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a Stream Status.
+	 */
+	public final StreamStatus asStreamStatus() {
+		return (StreamStatus) this;
+	}
+
+	/**
 	 * Casts this element into a LatencyMarker.
 	 * @return This element as a LatencyMarker.
 	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a LatencyMarker.

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 66d32da..3db649a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -23,13 +23,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
 import java.io.IOException;
 
 import static java.util.Objects.requireNonNull;
 
 /**
- * Serializer for {@link StreamRecord}, {@link Watermark} and {@link LatencyMarker}.
+ * Serializer for {@link StreamRecord}, {@link Watermark}, {@link LatencyMarker}, and {@link StreamStatus}.
  *
  * <p>
  * This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the
@@ -46,6 +47,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
 	private static final int TAG_WATERMARK = 2;
 	private static final int TAG_LATENCY_MARKER = 3;
+	private static final int TAG_STREAM_STATUS = 4;
 	
 	
 	private final TypeSerializer<T> typeSerializer;
@@ -98,7 +100,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			StreamRecord<T> fromRecord = from.asRecord();
 			return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
 		}
-		else if (from.isWatermark() || from.isLatencyMarker()) {
+		else if (from.isWatermark() || from.isStreamStatus() || from.isLatencyMarker()) {
 			// is immutable
 			return from;
 		}
@@ -117,7 +119,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			fromRecord.copyTo(valueCopy, reuseRecord);
 			return reuse;
 		}
-		else if (from.isWatermark() || from.isLatencyMarker()) {
+		else if (from.isWatermark() || from.isStreamStatus() || from.isLatencyMarker()) {
 			// is immutable
 			return from;
 		}
@@ -142,6 +144,9 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 		else if (tag == TAG_WATERMARK) {
 			target.writeLong(source.readLong());
 		}
+		else if (tag == TAG_STREAM_STATUS) {
+			target.writeInt(source.readInt());
+		}
 		else if (tag == TAG_LATENCY_MARKER) {
 			target.writeLong(source.readLong());
 			target.writeInt(source.readInt());
@@ -168,6 +173,10 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			target.write(TAG_WATERMARK);
 			target.writeLong(value.asWatermark().getTimestamp());
 		}
+		else if (value.isStreamStatus()) {
+			target.write(TAG_STREAM_STATUS);
+			target.writeInt(value.asStreamStatus().getStatus());
+		}
 		else if (value.isLatencyMarker()) {
 			target.write(TAG_LATENCY_MARKER);
 			target.writeLong(value.asLatencyMarker().getMarkedTime());
@@ -192,6 +201,9 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 		else if (tag == TAG_WATERMARK) {
 			return new Watermark(source.readLong());
 		}
+		else if (tag == TAG_STREAM_STATUS) {
+			return new StreamStatus(source.readInt());
+		}
 		else if (tag == TAG_LATENCY_MARKER) {
 			return new LatencyMarker(source.readLong(), source.readInt(), source.readInt());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
new file mode 100644
index 0000000..f17d240
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code StatusWatermarkValve} embodies the logic of how {@link Watermark} and {@link StreamStatus} are propagated to
+ * downstream outputs, given a set of one or multiple input channels that continuously receive them. Usages of this
+ * class need to define the number of input channels that the valve needs to handle, as well as provide a customized
+ * implementation of {@link ValveOutputHandler}, which is called by the valve only when it determines a new watermark or
+ * stream status can be propagated.
+ */
+@Internal
+public class StatusWatermarkValve {
+
+	/**
+	 * Usages of {@code StatusWatermarkValve} should implement a {@code ValveOutputHandler}
+	 * to handle watermark and stream status outputs from the valve.
+	 */
+	public interface ValveOutputHandler {
+		void handleWatermark(Watermark watermark);
+		void handleStreamStatus(StreamStatus streamStatus);
+	}
+
+	private final ValveOutputHandler outputHandler;
+
+	// ------------------------------------------------------------------------
+	//	Runtime state for watermark & stream status output determination
+	// ------------------------------------------------------------------------
+
+	/** Array of current status of all input channels. Changes as watermarks & stream statuses are fed into the valve */
+	private final InputChannelStatus[] channelStatuses;
+
+	/** The last watermark emitted from the valve */
+	private long lastOutputWatermark;
+
+	/** The last stream status emitted from the valve */
+	private StreamStatus lastOutputStreamStatus;
+
+	/**
+	 * Returns a new {@code StatusWatermarkValve}.
+	 *
+	 * @param numInputChannels the number of input channels that this valve will need to handle
+	 * @param outputHandler the customized output handler for the valve
+	 */
+	public StatusWatermarkValve(int numInputChannels, ValveOutputHandler outputHandler) {
+		checkArgument(numInputChannels > 0);
+		this.channelStatuses = new InputChannelStatus[numInputChannels];
+		for (int i = 0; i < numInputChannels; i++) {
+			channelStatuses[i] = new InputChannelStatus();
+			channelStatuses[i].watermark = Long.MIN_VALUE;
+			channelStatuses[i].streamStatus = StreamStatus.ACTIVE;
+			channelStatuses[i].isWatermarkAligned = true;
+		}
+
+		this.outputHandler = checkNotNull(outputHandler);
+
+		this.lastOutputWatermark = Long.MIN_VALUE;
+		this.lastOutputStreamStatus = StreamStatus.ACTIVE;
+	}
+
+	/**
+	 * Feed a {@link Watermark} into the valve. If the input triggers the valve to output a new Watermark,
+	 * {@link ValveOutputHandler#handleWatermark(Watermark)} will be called to process the new Watermark.
+	 *
+	 * @param watermark the watermark to feed to the valve
+	 * @param channelIndex the index of the channel that the fed watermark belongs to (index starting from 0)
+	 */
+	public void inputWatermark(Watermark watermark, int channelIndex) {
+		// ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
+		if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
+			long watermarkMillis = watermark.getTimestamp();
+
+			// if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
+			if (watermarkMillis > channelStatuses[channelIndex].watermark) {
+				channelStatuses[channelIndex].watermark = watermarkMillis;
+
+				// previously unaligned input channels are now aligned if its watermark has caught up
+				if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
+					channelStatuses[channelIndex].isWatermarkAligned = true;
+				}
+
+				// now, attempt to find a new min watermark across all aligned channels
+				findAndOutputNewMinWatermarkAcrossAlignedChannels();
+			}
+		}
+	}
+
+	/**
+	 * Feed a {@link StreamStatus} into the valve. This may trigger the valve to output either a new Stream Status,
+	 * for which {@link ValveOutputHandler#handleStreamStatus(StreamStatus)} will be called, or a new Watermark,
+	 * for which {@link ValveOutputHandler#handleWatermark(Watermark)} will be called.
+	 *
+	 * @param streamStatus the stream status to feed to the valve
+	 * @param channelIndex the index of the channel that the fed stream status belongs to (index starting from 0)
+	 */
+	public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) {
+		// only account for stream status inputs that will result in a status change for the input channel
+		if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) {
+			// handle active -> idle toggle for the input channel
+			channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE;
+
+			// the channel is now idle, therefore not aligned
+			channelStatuses[channelIndex].isWatermarkAligned = false;
+
+			// if all input channels of the valve are now idle, we need to output an idle stream
+			// status from the valve (this also marks the valve as idle)
+			if (!InputChannelStatus.hasActiveChannels(channelStatuses)) {
+				lastOutputStreamStatus = StreamStatus.IDLE;
+				outputHandler.handleStreamStatus(lastOutputStreamStatus);
+			} else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
+				// if the watermark of the channel that just became idle equals the last output
+				// watermark (the previous overall min watermark), we may be able to find a new
+				// min watermark from the remaining aligned channels
+				findAndOutputNewMinWatermarkAcrossAlignedChannels();
+			}
+		} else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) {
+			// handle idle -> active toggle for the input channel
+			channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE;
+
+			// if the last watermark of the input channel, before it was marked idle, is still larger than
+			// the overall last output watermark of the valve, then we can set the channel to be aligned already.
+			if (channelStatuses[channelIndex].watermark >= lastOutputWatermark) {
+				channelStatuses[channelIndex].isWatermarkAligned = true;
+			}
+
+			// if the valve was previously marked to be idle, mark it as active and output an active stream
+			// status because at least one of the input channels is now active
+			if (lastOutputStreamStatus.isIdle()) {
+				lastOutputStreamStatus = StreamStatus.ACTIVE;
+				outputHandler.handleStreamStatus(lastOutputStreamStatus);
+			}
+		}
+	}
+
+	private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
+		long newMinWatermark = Long.MAX_VALUE;
+
+		// determine new overall watermark by considering only watermark-aligned channels across all channels
+		for (InputChannelStatus channelStatus : channelStatuses) {
+			if (channelStatus.isWatermarkAligned) {
+				newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
+			}
+		}
+
+		// we acknowledge and output the new overall watermark if it is larger than the last output watermark
+		if (newMinWatermark > lastOutputWatermark) {
+			lastOutputWatermark = newMinWatermark;
+			outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
+		}
+	}
+
+	/**
+	 * An {@code InputChannelStatus} keeps track of an input channel's last watermark, stream status, and whether or not
+	 * the channel's current watermark is aligned with the overall watermark output from the valve.
+	 *
+	 * There are 2 situations where a channel's watermark is not considered aligned:
+	 *  - the current stream status of the channel is idle
+	 *  - the stream status has resumed to be active, but the watermark of the channel hasn't caught up to the
+	 *    last output watermark from the valve yet.
+	 */
+	private static class InputChannelStatus {
+		private long watermark;
+		private StreamStatus streamStatus;
+		private boolean isWatermarkAligned;
+
+		/** Utility to check if at least one channel in a given array of input channels is active */
+		private static boolean hasActiveChannels(InputChannelStatus[] channelStatuses) {
+			for (InputChannelStatus status : channelStatuses) {
+				if (status.streamStatus.isActive()) {
+					return true;
+				}
+			}
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
new file mode 100644
index 0000000..e82fad0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * A Stream Status element informs stream tasks whether or not they should continue to expect records and watermarks
+ * from the input stream that sent them. There are 2 kinds of status, namely {@link StreamStatus#IDLE} and
+ * {@link StreamStatus#ACTIVE}. Stream Status elements are generated at the sources, and may be propagated through
+ * the tasks of the topology. They directly infer the current status of the emitting task; a {@link SourceStreamTask} or
+ * {@link StreamTask} emits a {@link StreamStatus#IDLE} if it will temporarily halt to emit any records or watermarks
+ * (i.e. is idle), and emits a {@link StreamStatus#ACTIVE} once it resumes to do so (i.e. is active). Tasks are
+ * responsible for propagating their status further downstream once they toggle between being idle and active. The cases
+ * that source tasks and downstream tasks are considered either idle or active is explained below:
+ *
+ * <ul>
+ *     <li>Source tasks: A source task is considered to be idle if its head operator, i.e. a {@link StreamSource}, will
+ *         not emit records for an indefinite amount of time. This is the case, for example, for Flink's Kafka Consumer,
+ *         where sources might initially have no assigned partitions to read from, or no records can be read from the
+ *         assigned partitions. Once the head {@link StreamSource} operator detects that it will resume emitting data,
+ *         the source task is considered to be active. {@link StreamSource}s are responsible for toggling the status
+ *         of the containing source task and ensuring that no records (and possibly watermarks, in the case of Flink's
+ *         Kafka Consumer which can generate watermarks directly within the source) will be emitted while the task is
+ *         idle. This guarantee should be enforced on sources through
+ *         {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext} implementations.</li>
+ *
+ *     <li>Downstream tasks: a downstream task is considered to be idle if all its input streams are idle, i.e. the last
+ *         received Stream Status element from all input streams is a {@link StreamStatus#IDLE}. As long as one of its
+ *         input streams is active, i.e. the last received Stream Status element from the input stream is
+ *         {@link StreamStatus#ACTIVE}, the task is active.</li>
+ * </ul>
+ *
+ * <p>
+ * Stream Status elements received at downstream tasks also affect and control how their operators process and advance
+ * their watermarks. The below describes the effects (the logic is implemented as a {@link StatusWatermarkValve} which
+ * downstream tasks should use for such purposes):
+ *
+ * <ul>
+ *     <li>Since source tasks guarantee that no records will be emitted between a {@link StreamStatus#IDLE} and
+ *         {@link StreamStatus#ACTIVE}, downstream tasks can always safely process and propagate records through their
+ *         operator chain when they receive them, without the need to check whether or not the task is currently idle or
+ *         active. However, for watermarks, since there may be watermark generators that might produce watermarks
+ *         anywhere in the middle of topologies regardless of whether there are input data at the operator, the current
+ *         status of the task must be checked before forwarding watermarks emitted from
+ *         an operator. It the status is actually idle, the watermark must be blocked.</li>
+ *
+ *     <li>For downstream tasks with multiple input streams, the watermarks of input streams that are temporarily idle,
+ *         or has resumed to be active but its watermark is behind the overall min watermark of the operator, should not
+ *         be accounted for when deciding whether or not to advance the watermark and propagated through the operator
+ *         chain.</li>
+ * </ul>
+ *
+ * <p>
+ * Note that to notify downstream tasks that a source task is permanently closed and will no longer send any more
+ * elements, the source should still send a {@link Watermark#MAX_WATERMARK} instead of {@link StreamStatus#IDLE}.
+ * Stream Status elements only serve as markers for temporary status.
+ */
+@Internal
+public final class StreamStatus extends StreamElement {
+
+	public static final int IDLE_STATUS = -1;
+	public static final int ACTIVE_STATUS = 0;
+
+	public static final StreamStatus IDLE = new StreamStatus(IDLE_STATUS);
+	public static final StreamStatus ACTIVE = new StreamStatus(ACTIVE_STATUS);
+
+	public final int status;
+
+	public StreamStatus(int status) {
+		if (status != IDLE_STATUS && status != ACTIVE_STATUS) {
+			throw new IllegalArgumentException("Invalid status value for StreamStatus; " +
+				"allowed values are " + ACTIVE_STATUS + " (for ACTIVE) and " + IDLE_STATUS + " (for IDLE).");
+		}
+
+		this.status = status;
+	}
+
+	public boolean isIdle() {
+		return this.status == IDLE_STATUS;
+	}
+
+	public boolean isActive() {
+		return !isIdle();
+	}
+
+	public int getStatus() {
+		return status;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		return this == o ||
+			o != null && o.getClass() == StreamStatus.class && ((StreamStatus) o).status == this.status;
+	}
+
+	@Override
+	public int hashCode() {
+		return status;
+	}
+
+	@Override
+	public String toString() {
+		String statusStr = (status == ACTIVE_STATUS) ? "ACTIVE" : "IDLE";
+		return "StreamStatus(" + statusStr + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java
new file mode 100644
index 0000000..ae8d9af
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for retrieving the current {@link StreamStatus}.
+ */
+@Internal
+public interface StreamStatusProvider {
+
+	/**
+	 * Returns the current stream status.
+	 *
+	 * @return current stream status.
+	 */
+	StreamStatus getStreamStatus();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 0f41103..e559ad0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -35,18 +35,27 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 	@Override
 	public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
-		
+
 		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
 		int numberOfInputs = configuration.getNumberOfInputs();
 
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
-			inputProcessor = new StreamInputProcessor<IN>(
-					inputGates, inSerializer,
-					this, 
+
+			@SuppressWarnings("unchecked")
+			OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain =
+					(OperatorChain) this.operatorChain;
+
+			inputProcessor = new StreamInputProcessor<>(
+					inputGates,
+					inSerializer,
+					this,
 					configuration.getCheckpointMode(),
+					getCheckpointLock(),
 					getEnvironment().getIOManager(),
-					getEnvironment().getTaskManagerInfo().getConfiguration());
+					getEnvironment().getTaskManagerInfo().getConfiguration(),
+					operatorChain,
+					this.headOperator);
 
 			// make sure that stream tasks report their I/O statistics
 			inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
@@ -55,12 +64,10 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	protected void run() throws Exception {
-		// cache some references on the stack, to make the code more JIT friendly
-		final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
+		// cache processor reference on the stack, to make the code more JIT friendly
 		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
-		final Object lock = getCheckpointLock();
-		
-		while (running && inputProcessor.processInput(operator, lock)) {
+
+		while (running && inputProcessor.processInput()) {
 			// all the work happens in the "processInput" method
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 6d01795..7e24eea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -30,18 +30,20 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +63,7 @@ import java.util.Random;
  *              head operator.
  */
 @Internal
-public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
+public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusProvider {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
 	
@@ -73,6 +75,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 
 	private final OP headOperator;
 
+	/**
+	 * Current status of the input stream of the operator chain.
+	 * Watermarks explicitly generated by operators in the chain (i.e. timestamp
+	 * assigner / watermark extractors), will be blocked and not forwarded if
+	 * this value is {@link StreamStatus#IDLE}.
+	 */
+	private StreamStatus streamStatus = StreamStatus.ACTIVE;
+
 	public OperatorChain(StreamTask<OUT, OP> containingTask) {
 		
 		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
@@ -110,7 +120,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 					chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
 
 			if (headOperator != null) {
-				headOperator.setup(containingTask, configuration, getChainEntryPoint());
+				Output output = getChainEntryPoint();
+				headOperator.setup(containingTask, configuration, output);
 			}
 
 			// add head operator to end of chain
@@ -135,6 +146,22 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		
 	}
 
+	@Override
+	public StreamStatus getStreamStatus() {
+		return streamStatus;
+	}
+
+	public void setStreamStatus(StreamStatus status) throws IOException {
+		if (!status.equals(this.streamStatus)) {
+			this.streamStatus = status;
+
+			// try and forward the stream status change to all outgoing connections
+			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+				streamOutput.emitStreamStatus(status);
+			}
+		}
+	}
+
 
 	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
 		try {
@@ -219,7 +246,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 	//  initialization utilities
 	// ------------------------------------------------------------------------
 	
-	private static <T> Output<StreamRecord<T>> createOutputCollector(
+	private <T> Output<StreamRecord<T>> createOutputCollector(
 			StreamTask<?, ?> containingTask,
 			StreamConfig operatorConfig,
 			Map<Integer, StreamConfig> chainedConfigs,
@@ -233,7 +260,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
 			@SuppressWarnings("unchecked")
 			RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
-			
+
 			allOutputs.add(new Tuple2<Output<StreamRecord<T>>, StreamEdge>(output, outputEdge));
 		}
 
@@ -270,9 +297,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 				// If the chaining output does not copy we need to copy in the broadcast output,
 				// otherwise multi-chaining would not work correctly.
 				if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-					return new CopyingBroadcastingOutputCollector<>(asArray);
+					return new CopyingBroadcastingOutputCollector<>(asArray, this);
 				} else  {
-					return new BroadcastingOutputCollector<>(asArray);
+					return new BroadcastingOutputCollector<>(asArray, this);
 				}
 			}
 		}
@@ -291,7 +318,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		}
 	}
 	
-	private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
+	private <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
 			StreamTask<?, ?> containingTask,
 			StreamConfig operatorConfig,
 			Map<Integer, StreamConfig> chainedConfigs,
@@ -310,15 +337,15 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		allOperators.add(chainedOperator);
 
 		if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-			return new ChainingOutput<>(chainedOperator);
+			return new ChainingOutput<>(chainedOperator, this);
 		}
 		else {
 			TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-			return new CopyingChainingOutput<>(chainedOperator, inSerializer);
+			return new CopyingChainingOutput<>(chainedOperator, inSerializer, this);
 		}
 	}
 	
-	private static <T> RecordWriterOutput<T> createStreamOutput(
+	private <T> RecordWriterOutput<T> createStreamOutput(
 			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
 			Environment taskEnvironment,
 			String taskName)
@@ -344,7 +371,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
 		output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
 		
-		return new RecordWriterOutput<>(output, outSerializer);
+		return new RecordWriterOutput<>(output, outSerializer, this);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -356,9 +383,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		protected final OneInputStreamOperator<T, ?> operator;
 		protected final Counter numRecordsIn;
 
-		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
+		protected final StreamStatusProvider streamStatusProvider;
+
+		public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) {
 			this.operator = operator;
 			this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+			this.streamStatusProvider = streamStatusProvider;
 		}
 
 		@Override
@@ -376,7 +406,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		@Override
 		public void emitWatermark(Watermark mark) {
 			try {
-				operator.processWatermark(mark);
+				if (streamStatusProvider.getStreamStatus().isActive()) {
+					operator.processWatermark(mark);
+				}
 			}
 			catch (Exception e) {
 				throw new ExceptionInChainedOperatorException(e);
@@ -408,8 +440,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		
 		private final TypeSerializer<T> serializer;
 		
-		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
-			super(operator);
+		public CopyingChainingOutput(
+				OneInputStreamOperator<T, ?> operator,
+				TypeSerializer<T> serializer,
+				StreamStatusProvider streamStatusProvider) {
+			super(operator, streamStatusProvider);
 			this.serializer = serializer;
 		}
 
@@ -432,15 +467,22 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		protected final Output<StreamRecord<T>>[] outputs;
 
 		private final Random RNG = new XORShiftRandom();
+
+		private final StreamStatusProvider streamStatusProvider;
 		
-		public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
+		public BroadcastingOutputCollector(
+				Output<StreamRecord<T>>[] outputs,
+				StreamStatusProvider streamStatusProvider) {
 			this.outputs = outputs;
+			this.streamStatusProvider = streamStatusProvider;
 		}
 
 		@Override
 		public void emitWatermark(Watermark mark) {
-			for (Output<StreamRecord<T>> output : outputs) {
-				output.emitWatermark(mark);
+			if (streamStatusProvider.getStreamStatus().isActive()) {
+				for (Output<StreamRecord<T>> output : outputs) {
+					output.emitWatermark(mark);
+				}
 			}
 		}
 
@@ -477,8 +519,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 	 */
 	private static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
 
-		public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-			super(outputs);
+		public CopyingBroadcastingOutputCollector(
+				Output<StreamRecord<T>>[] outputs,
+				StreamStatusProvider streamStatusProvider) {
+			super(outputs, streamStatusProvider);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index a5f94ad..cdac11a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -38,8 +38,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 
 	@Override
 	public void init() throws Exception {
-		super.init();
-		
+
 		final String iterationId = getConfiguration().getIterationId();
 		if (iterationId == null || iterationId.length() == 0) {
 			throw new Exception("Missing iteration ID in the task configuration");
@@ -51,15 +50,18 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
 
 		LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
-		
+
 		@SuppressWarnings("unchecked")
 		BlockingQueue<StreamRecord<IN>> dataChannel =
 				(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
-		
+
 		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
-		
+
 		this.headOperator = new RecordPusher<>();
 		this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
+
+		// call super.init() last because that needs this.headOperator to be set up
+		super.init();
 	}
 
 	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 63843bb..2676b64 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -139,7 +139,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	protected OP headOperator;
 
 	/** The chain of operators executed by this task */
-	private OperatorChain<OUT, OP> operatorChain;
+	protected OperatorChain<OUT, OP> operatorChain;
 
 	/** The configuration of this streaming task */
 	private StreamConfig configuration;
@@ -637,7 +637,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		boolean restored = null != restoreStateHandles;
 
 		if (restored) {
-
 			checkRestorePreconditions(operatorChain.getChainLength());
 			initializeOperators(true);
 			restoreStateHandles = null; // free for GC

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 233e9f1..175bd76 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -64,14 +64,21 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 					throw new RuntimeException("Invalid input type number: " + inputType);
 			}
 		}
-	
-		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(
+
+		@SuppressWarnings("unchecked")
+		OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain =
+				(OperatorChain) this.operatorChain;
+
+		this.inputProcessor = new StreamTwoInputProcessor<>(
 				inputList1, inputList2,
 				inputDeserializer1, inputDeserializer2,
 				this,
 				configuration.getCheckpointMode(),
+				getCheckpointLock(),
 				getEnvironment().getIOManager(),
-				getEnvironment().getTaskManagerInfo().getConfiguration());
+				getEnvironment().getTaskManagerInfo().getConfiguration(),
+				operatorChain,
+				this.headOperator);
 
 		// make sure that stream tasks report their I/O statistics
 		inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
@@ -79,12 +86,10 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	protected void run() throws Exception {
-		// cache some references on the stack, to make the code more JIT friendly
-		final TwoInputStreamOperator<IN1, IN2, OUT> operator = this.headOperator;
+		// cache processor reference on the stack, to make the code more JIT friendly
 		final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
-		final Object lock = getCheckpointLock();
-		
-		while (running && inputProcessor.processInput(operator, lock)) {
+
+		while (running && inputProcessor.processInput()) {
 			// all the work happens in the "processInput" method
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index c2b0803..0255ee6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -374,6 +374,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
 		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
 				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.taskConfig = chainedVertex.getConfiguration();
 
@@ -484,6 +485,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
 		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
 				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
 			new LazyAsyncFunction(),
@@ -536,6 +538,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		final OneInputStreamTaskTestHarness<Integer, Integer> restoredTaskHarness =
 				new OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+		restoredTaskHarness.setupOutputForSingletonOperatorChain();
 
 		AsyncWaitOperator<Integer, Integer> restoredOperator = new AsyncWaitOperator<>(
 			new MyAsyncFunction(),

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
index 36cf53a..dafdeed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -33,6 +33,7 @@ public class StreamIterationHeadTest {
 		StreamIterationHead<Integer> head = new StreamIterationHead<>();
 		StreamTaskTestHarness<Integer> harness = new StreamTaskTestHarness<>(head,
 				BasicTypeInfo.INT_TYPE_INFO);
+		harness.setupOutputForSingletonOperatorChain();
 		harness.getStreamConfig().setIterationId("1");
 		harness.getStreamConfig().setIterationWaitTime(1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index e0e0e91..f41ec02 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -46,6 +46,7 @@ public class StreamTaskTimerTest {
 		
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
 				mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		
@@ -83,6 +84,7 @@ public class StreamTaskTimerTest {
 		try {
 			final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
 			final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+			testHarness.setupOutputForSingletonOperatorChain();
 
 			StreamConfig streamConfig = testHarness.getStreamConfig();
 			StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 4d24b82..9897884 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -44,6 +44,7 @@ public class TestProcessingTimeServiceTest {
 
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
 			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index f84836b..9ddea8c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.streaming.runtime.operators;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Test;