You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/10 14:50:04 UTC

[2/2] flink git commit: [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources

[FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources

This commit is the first part of making idle streaming sources in Flink
possible. It introduces a new element, StreamStatus, that flows with
other records in streams. StreamStatus elements are generated at the
sources, and affect how operators advance their watermarks with the
presence of idle sources.

Prior to this commit, when advancing watermarks at downstream operators,
the new min watermark is found by simply determining if the min
watermark across all input channels has advanced. This resulted in
watermark-stalling downstream operators when there are idle sources.
With this change, operators can now mark input channels to be idle, and
ignore them when advancing their watermark.

This commit also includes refactoring of previous watermark forwarding
logic into a single class, StatusWatermarkValve. OneInputStreamTasks,
TwoInputStreamTasks, and AbstractStreamOperator use valves to help them
determine how watermarks and stream statuses are forwarded.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66305135
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66305135
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66305135

Branch: refs/heads/master
Commit: 66305135bcfe0841fdc9d26fdc0d8f373fa58b62
Parents: 0241032
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Nov 14 10:53:18 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 10 14:20:42 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |   2 +
 .../runtime/io/RecordWriterOutput.java          |  24 +-
 .../runtime/io/StreamInputProcessor.java        | 124 ++++--
 .../runtime/io/StreamTwoInputProcessor.java     | 236 +++++++----
 ...TimestampsAndPeriodicWatermarksOperator.java |   5 +
 ...mestampsAndPunctuatedWatermarksOperator.java |   5 +
 .../runtime/streamrecord/StreamElement.java     |  18 +
 .../streamrecord/StreamElementSerializer.java   |  18 +-
 .../streamstatus/StatusWatermarkValve.java      | 199 ++++++++++
 .../runtime/streamstatus/StreamStatus.java      | 128 ++++++
 .../streamstatus/StreamStatusProvider.java      |  34 ++
 .../runtime/tasks/OneInputStreamTask.java       |  27 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  88 +++-
 .../runtime/tasks/StreamIterationTail.java      |  12 +-
 .../streaming/runtime/tasks/StreamTask.java     |   3 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  21 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   3 +
 .../api/streamtask/StreamIterationHeadTest.java |   1 +
 .../runtime/operators/StreamTaskTimerTest.java  |   2 +
 .../TestProcessingTimeServiceTest.java          |   1 +
 ...stampsAndPeriodicWatermarksOperatorTest.java |   2 -
 .../streamstatus/StatusWatermarkValveTest.java  | 398 +++++++++++++++++++
 .../runtime/streamstatus/StreamStatusTest.java  |  80 ++++
 .../runtime/tasks/OneInputStreamTaskTest.java   | 293 +++++++++++++-
 .../runtime/tasks/SourceStreamTaskTest.java     |   2 +
 .../StreamTaskCancellationBarrierTest.java      |   3 +
 .../runtime/tasks/StreamTaskTestHarness.java    |  27 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |  47 ++-
 28 files changed, 1617 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 46a184a..6587291 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -119,6 +119,7 @@ public class RocksDBAsyncSnapshotTest {
 		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
 
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
 			@Override
@@ -219,6 +220,7 @@ public class RocksDBAsyncSnapshotTest {
 		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
 
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 2625031..51c6cd7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -27,9 +27,11 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -43,11 +45,13 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	
 	private SerializationDelegate<StreamElement> serializationDelegate;
 
+	private final StreamStatusProvider streamStatusProvider;
 	
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
 			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
-			TypeSerializer<OUT> outSerializer) {
+			TypeSerializer<OUT> outSerializer,
+			StreamStatusProvider streamStatusProvider) {
 
 		checkNotNull(recordWriter);
 		
@@ -62,6 +66,8 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 		if (outSerializer != null) {
 			serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
 		}
+
+		this.streamStatusProvider = checkNotNull(streamStatusProvider);
 	}
 
 	@Override
@@ -79,7 +85,19 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	@Override
 	public void emitWatermark(Watermark mark) {
 		serializationDelegate.setInstance(mark);
-		
+
+		if (streamStatusProvider.getStreamStatus().isActive()) {
+			try {
+				recordWriter.broadcastEmit(serializationDelegate);
+			} catch (Exception e) {
+				throw new RuntimeException(e.getMessage(), e);
+			}
+		}
+	}
+
+	public void emitStreamStatus(StreamStatus streamStatus) {
+		serializationDelegate.setInstance(streamStatus);
+
 		try {
 			recordWriter.broadcastEmit(serializationDelegate);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index b3257a5..e2061c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -43,20 +43,26 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
  *
  * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
+ * This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and {@link StreamStatus} events,
+ * and forwards them to event subscribers once the {@link StatusWatermarkValve} determines the {@link Watermark} from
+ * all inputs has advanced, or that a {@link StreamStatus} needs to be propagated downstream to denote a status change.
  *
  * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
+ * Forwarding elements, watermarks, or status status elements must be protected by synchronizing on the given lock
  * object. This ensures that we don't call methods on a {@link OneInputStreamOperator} concurrently
  * with the timer callback or other things.
  * 
@@ -69,29 +75,48 @@ public class StreamInputProcessor<IN> {
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
 
+	private final DeserializationDelegate<StreamElement> deserializationDelegate;
+
 	private final CheckpointBarrierHandler barrierHandler;
 
-	// We need to keep track of the channel from which a buffer came, so that we can
-	// appropriately map the watermarks to input channels
-	private int currentChannel = -1;
+	private final Object lock;
 
-	private boolean isFinished;
+	// ---------------- Status and Watermark Valve ------------------
 
-	private final long[] watermarks;
-	private long lastEmittedWatermark;
+	/** Valve that controls how watermarks and stream statuses are forwarded. */
+	private StatusWatermarkValve statusWatermarkValve;
 
-	private final DeserializationDelegate<StreamElement> deserializationDelegate;
+	/** Number of input channels the valve needs to handle. */
+	private final int numInputChannels;
 
+	/**
+	 * The channel from which a buffer came, tracked so that we can appropriately map
+	 * the watermarks and watermark statuses to channel indexes of the valve.
+	 */
+	private int currentChannel = -1;
+
+	private final OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain;
+	
+	private final OneInputStreamOperator<IN, ?> streamOperator;
+
+	// ---------------- Metrics ------------------
+
+	private long lastEmittedWatermark;
 	private Counter numRecordsIn;
 
+	private boolean isFinished;
+
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(
 			InputGate[] inputGates,
 			TypeSerializer<IN> inputSerializer,
 			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
+			Object lock,
 			IOManager ioManager,
-			Configuration taskManagerConfig) throws IOException {
+			Configuration taskManagerConfig,
+			OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain,
+			OneInputStreamOperator<IN, ?> streamOperator) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -114,6 +139,8 @@ public class StreamInputProcessor<IN> {
 		if (checkpointedTask != null) {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
+
+		this.lock = checkNotNull(lock);
 		
 		StreamElementSerializer<IN> ser = new StreamElementSerializer<>(inputSerializer);
 		this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser);
@@ -126,15 +153,19 @@ public class StreamInputProcessor<IN> {
 					ioManager.getSpillingDirectoriesPaths());
 		}
 
-		watermarks = new long[inputGate.getNumberOfInputChannels()];
-		for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
-			watermarks[i] = Long.MIN_VALUE;
-		}
-		lastEmittedWatermark = Long.MIN_VALUE;
+		this.numInputChannels = inputGate.getNumberOfInputChannels();
+
+		this.lastEmittedWatermark = Long.MIN_VALUE;
+
+		this.operatorChain = checkNotNull(operatorChain);
+		this.streamOperator = checkNotNull(streamOperator);
+
+		this.statusWatermarkValve = new StatusWatermarkValve(
+				numInputChannels,
+				new ForwardingValveOutputHandler(streamOperator, lock));
 	}
 
-	@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
+	public boolean processInput() throws Exception {
 		if (isFinished) {
 			return false;
 		}
@@ -155,22 +186,14 @@ public class StreamInputProcessor<IN> {
 					StreamElement recordOrMark = deserializationDelegate.getInstance();
 
 					if (recordOrMark.isWatermark()) {
-						long watermarkMillis = recordOrMark.asWatermark().getTimestamp();
-						if (watermarkMillis > watermarks[currentChannel]) {
-							watermarks[currentChannel] = watermarkMillis;
-							long newMinWatermark = Long.MAX_VALUE;
-							for (long watermark: watermarks) {
-								newMinWatermark = Math.min(watermark, newMinWatermark);
-							}
-							if (newMinWatermark > lastEmittedWatermark) {
-								lastEmittedWatermark = newMinWatermark;
-								synchronized (lock) {
-									streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
-								}
-							}
-						}
+						// handle watermark
+						statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
+						continue;
+					} else if (recordOrMark.isStreamStatus()) {
+						// handle stream status
+						statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
 						continue;
-					} else if(recordOrMark.isLatencyMarker()) {
+					} else if (recordOrMark.isLatencyMarker()) {
 						// handle latency marker
 						synchronized (lock) {
 							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
@@ -247,4 +270,39 @@ public class StreamInputProcessor<IN> {
 		// cleanup the barrier handler resources
 		barrierHandler.cleanup();
 	}
+
+	private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
+		private final OneInputStreamOperator<IN, ?> operator;
+		private final Object lock;
+
+		private ForwardingValveOutputHandler(final OneInputStreamOperator<IN, ?> operator, final Object lock) {
+			this.operator = checkNotNull(operator);
+			this.lock = checkNotNull(lock);
+		}
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			try {
+				synchronized (lock) {
+					lastEmittedWatermark = watermark.getTimestamp();
+					operator.processWatermark(watermark);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			try {
+				synchronized (lock) {
+					operatorChain.setStreamStatus(streamStatus);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e5aeec1..a295395 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -39,23 +39,29 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}.
  *
  * <p>
- * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers
- * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances.
+ * This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and {@link StreamStatus} events,
+ * and forwards watermarks to event subscribers once the {@link StatusWatermarkValve} determines the watermarks from
+ * all inputs has advanced, or changes the task's {@link StreamStatus} once status change is toggled.
  *
  * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
+ * Forwarding elements, watermarks, or status status elements must be protected by synchronizing on the given lock
  * object. This ensures that we don't call methods on a {@link TwoInputStreamOperator} concurrently
  * with the timer callback or other things.
  *
@@ -69,26 +75,50 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
 
-	// We need to keep track of the channel from which a buffer came, so that we can
-	// appropriately map the watermarks to input channels
-	private int currentChannel = -1;
-
-	private boolean isFinished;
+	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
+	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
 
 	private final CheckpointBarrierHandler barrierHandler;
 
-	private final long[] watermarks1;
-	private long lastEmittedWatermark1;
+	private final Object lock;
 
-	private final long[] watermarks2;
-	private long lastEmittedWatermark2;
+	// ---------------- Status and Watermark Valves ------------------
 
+	/**
+	 * Stream status for the two inputs. We need to keep track for determining when
+	 * to forward stream status changes downstream.
+	 */
+	private StreamStatus firstStatus;
+	private StreamStatus secondStatus;
+
+	/**
+	 * Valves that control how watermarks and stream statuses from the 2 inputs are forwarded.
+	 */
+	private StatusWatermarkValve statusWatermarkValve1;
+	private StatusWatermarkValve statusWatermarkValve2;
+
+	/** Number of input channels the valves need to handle. */
 	private final int numInputChannels1;
+	private final int numInputChannels2;
 
-	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
-	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
+	/**
+	 * The channel from which a buffer came, tracked so that we can appropriately map
+	 * the watermarks and watermark statuses to the correct channel index of the correct valve.
+	 */
+	private int currentChannel = -1;
 
-	@SuppressWarnings({"unchecked", "rawtypes"})
+	private final OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain;
+
+	private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
+
+	// ---------------- Metrics ------------------
+
+	private long lastEmittedWatermark1;
+	private long lastEmittedWatermark2;
+
+	private boolean isFinished;
+
+	@SuppressWarnings("unchecked")
 	public StreamTwoInputProcessor(
 			Collection<InputGate> inputGates1,
 			Collection<InputGate> inputGates2,
@@ -96,8 +126,11 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			TypeSerializer<IN2> inputSerializer2,
 			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
+			Object lock,
 			IOManager ioManager,
-			Configuration taskManagerConfig) throws IOException {
+			Configuration taskManagerConfig,
+			OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain,
+			TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws IOException {
 
 		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
 
@@ -120,6 +153,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		if (checkpointedTask != null) {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
+
+		this.lock = checkNotNull(lock);
 		
 		StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1);
 		this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1);
@@ -142,19 +177,23 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		}
 		
 		this.numInputChannels1 = numInputChannels1;
-		int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+		this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
 
-		watermarks1 = new long[numInputChannels1];
-		Arrays.fill(watermarks1, Long.MIN_VALUE);
-		lastEmittedWatermark1 = Long.MIN_VALUE;
+		this.lastEmittedWatermark1 = Long.MIN_VALUE;
+		this.lastEmittedWatermark2 = Long.MIN_VALUE;
+
+		this.firstStatus = StreamStatus.ACTIVE;
+		this.secondStatus = StreamStatus.ACTIVE;
+
+		this.operatorChain = checkNotNull(operatorChain);
+		this.streamOperator = checkNotNull(streamOperator);
+
+		this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
+		this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock));
 
-		watermarks2 = new long[numInputChannels2];
-		Arrays.fill(watermarks2, Long.MIN_VALUE);
-		lastEmittedWatermark2 = Long.MIN_VALUE;
 	}
 
-	@SuppressWarnings("unchecked")
-	public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Object lock) throws Exception {
+	public boolean processInput() throws Exception {
 		if (isFinished) {
 			return false;
 		}
@@ -177,7 +216,11 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					if (currentChannel < numInputChannels1) {
 						StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
 						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
+							statusWatermarkValve1.inputWatermark(recordOrWatermark.asWatermark(), currentChannel);
+							continue;
+						}
+						else if (recordOrWatermark.isStreamStatus()) {
+							statusWatermarkValve1.inputStreamStatus(recordOrWatermark.asStreamStatus(), currentChannel);
 							continue;
 						}
 						else if (recordOrWatermark.isLatencyMarker()) {
@@ -187,9 +230,10 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 							continue;
 						}
 						else {
+							StreamRecord<IN1> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
-								streamOperator.setKeyContextElement1(recordOrWatermark.<IN1>asRecord());
-								streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
+								streamOperator.setKeyContextElement1(record);
+								streamOperator.processElement1(record);
 							}
 							return true;
 
@@ -198,7 +242,11 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					else {
 						StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
 						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
+							statusWatermarkValve2.inputWatermark(recordOrWatermark.asWatermark(), currentChannel - numInputChannels1);
+							continue;
+						}
+						else if (recordOrWatermark.isStreamStatus()) {
+							statusWatermarkValve2.inputStreamStatus(recordOrWatermark.asStreamStatus(), currentChannel - numInputChannels1);
 							continue;
 						}
 						else if (recordOrWatermark.isLatencyMarker()) {
@@ -208,9 +256,10 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 							continue;
 						}
 						else {
+							StreamRecord<IN2> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
-								streamOperator.setKeyContextElement2(recordOrWatermark.<IN2>asRecord());
-								streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
+								streamOperator.setKeyContextElement2(record);
+								streamOperator.processElement2(record);
 							}
 							return true;
 						}
@@ -244,41 +293,6 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		}
 	}
 
-	private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex, Object lock) throws Exception {
-		if (channelIndex < numInputChannels1) {
-			long watermarkMillis = mark.getTimestamp();
-			if (watermarkMillis > watermarks1[channelIndex]) {
-				watermarks1[channelIndex] = watermarkMillis;
-				long newMinWatermark = Long.MAX_VALUE;
-				for (long wm : watermarks1) {
-					newMinWatermark = Math.min(wm, newMinWatermark);
-				}
-				if (newMinWatermark > lastEmittedWatermark1) {
-					lastEmittedWatermark1 = newMinWatermark;
-					synchronized (lock) {
-						operator.processWatermark1(new Watermark(lastEmittedWatermark1));
-					}
-				}
-			}
-		} else {
-			channelIndex = channelIndex - numInputChannels1;
-			long watermarkMillis = mark.getTimestamp();
-			if (watermarkMillis > watermarks2[channelIndex]) {
-				watermarks2[channelIndex] = watermarkMillis;
-				long newMinWatermark = Long.MAX_VALUE;
-				for (long wm : watermarks2) {
-					newMinWatermark = Math.min(wm, newMinWatermark);
-				}
-				if (newMinWatermark > lastEmittedWatermark2) {
-					lastEmittedWatermark2 = newMinWatermark;
-					synchronized (lock) {
-						operator.processWatermark2(new Watermark(lastEmittedWatermark2));
-					}
-				}
-			}
-		}
-	}
-
 	/**
 	 * Sets the metric group for this StreamTwoInputProcessor.
 	 *
@@ -312,4 +326,92 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		// cleanup the barrier handler resources
 		barrierHandler.cleanup();
 	}
+
+	private class ForwardingValveOutputHandler1 implements StatusWatermarkValve.ValveOutputHandler {
+		private final TwoInputStreamOperator<IN1, IN2, ?> operator;
+		private final Object lock;
+
+		private ForwardingValveOutputHandler1(final TwoInputStreamOperator<IN1, IN2, ?> operator, final Object lock) {
+			this.operator = checkNotNull(operator);
+			this.lock = checkNotNull(lock);
+		}
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			try {
+				synchronized (lock) {
+					lastEmittedWatermark1 = watermark.getTimestamp();
+					operator.processWatermark1(watermark);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
+			}
+		}
+
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			try {
+				synchronized (lock) {
+					firstStatus = streamStatus;
+
+					// check if we need to toggle the task's stream status
+					if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+						if (streamStatus.isActive()) {
+							// we're no longer idle if at least one input has become active
+							operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+						} else if (secondStatus.isIdle()) {
+							// we're idle once both inputs are idle
+							operatorChain.setStreamStatus(StreamStatus.IDLE);
+						}
+					}
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
+			}
+		}
+	}
+
+	private class ForwardingValveOutputHandler2 implements StatusWatermarkValve.ValveOutputHandler {
+		private final TwoInputStreamOperator<IN1, IN2, ?> operator;
+		private final Object lock;
+
+		private ForwardingValveOutputHandler2(final TwoInputStreamOperator<IN1, IN2, ?> operator, final Object lock) {
+			this.operator = checkNotNull(operator);
+			this.lock = checkNotNull(lock);
+		}
+
+		@Override
+		public void handleWatermark(Watermark watermark) {
+			try {
+				synchronized (lock) {
+					lastEmittedWatermark2 = watermark.getTimestamp();
+					operator.processWatermark2(watermark);
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
+			}
+		}
+
+		@Override
+		public void handleStreamStatus(StreamStatus streamStatus) {
+			try {
+				synchronized (lock) {
+					secondStatus = streamStatus;
+
+					// check if we need to toggle the task's stream status
+					if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+						if (streamStatus.isActive()) {
+							// we're no longer idle if at least one input has become active
+							operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+						} else if (firstStatus.isIdle()) {
+							// we're idle once both inputs are idle
+							operatorChain.setStreamStatus(StreamStatus.IDLE);
+						}
+					}
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index 4defb96..0863580 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -82,6 +82,11 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
 	}
 
+	/**
+	 * Override the base implementation to completely ignore watermarks propagated from
+	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
+	 * watermarks from here).
+	 */
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		// if we receive a Long.MAX_VALUE watermark we forward it since it is used

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
index ac85b8a..3fc9f9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java
@@ -59,6 +59,11 @@ public class TimestampsAndPunctuatedWatermarksOperator<T>
 		}
 	}
 
+	/**
+	 * Override the base implementation to completely ignore watermarks propagated from
+	 * upstream (we rely only on the {@link AssignerWithPunctuatedWatermarks} to emit
+	 * watermarks from here).
+	 */
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		// if we receive a Long.MAX_VALUE watermark we forward it since it is used

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
index 62418bc..643e240 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
 /**
  * An element in a data stream. Can be a record or a Watermark.
@@ -36,6 +37,14 @@ public abstract class StreamElement {
 	}
 
 	/**
+	 * Checks whether this element is a stream status.
+	 * @return True, if this element is a stream status, false otherwise.
+	 */
+	public final boolean isStreamStatus() {
+		return getClass() == StreamStatus.class;
+	}
+
+	/**
 	 * Checks whether this element is a record.
 	 * @return True, if this element is a record, false otherwise.
 	 */
@@ -71,6 +80,15 @@ public abstract class StreamElement {
 	}
 
 	/**
+	 * Casts this element into a StreamStatus.
+	 * @return This element as a StreamStatus.
+	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a Stream Status.
+	 */
+	public final StreamStatus asStreamStatus() {
+		return (StreamStatus) this;
+	}
+
+	/**
 	 * Casts this element into a LatencyMarker.
 	 * @return This element as a LatencyMarker.
 	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a LatencyMarker.

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 66d32da..3db649a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -23,13 +23,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
 import java.io.IOException;
 
 import static java.util.Objects.requireNonNull;
 
 /**
- * Serializer for {@link StreamRecord}, {@link Watermark} and {@link LatencyMarker}.
+ * Serializer for {@link StreamRecord}, {@link Watermark}, {@link LatencyMarker}, and {@link StreamStatus}.
  *
  * <p>
  * This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the
@@ -46,6 +47,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
 	private static final int TAG_WATERMARK = 2;
 	private static final int TAG_LATENCY_MARKER = 3;
+	private static final int TAG_STREAM_STATUS = 4;
 	
 	
 	private final TypeSerializer<T> typeSerializer;
@@ -98,7 +100,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			StreamRecord<T> fromRecord = from.asRecord();
 			return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
 		}
-		else if (from.isWatermark() || from.isLatencyMarker()) {
+		else if (from.isWatermark() || from.isStreamStatus() || from.isLatencyMarker()) {
 			// is immutable
 			return from;
 		}
@@ -117,7 +119,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			fromRecord.copyTo(valueCopy, reuseRecord);
 			return reuse;
 		}
-		else if (from.isWatermark() || from.isLatencyMarker()) {
+		else if (from.isWatermark() || from.isStreamStatus() || from.isLatencyMarker()) {
 			// is immutable
 			return from;
 		}
@@ -142,6 +144,9 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 		else if (tag == TAG_WATERMARK) {
 			target.writeLong(source.readLong());
 		}
+		else if (tag == TAG_STREAM_STATUS) {
+			target.writeInt(source.readInt());
+		}
 		else if (tag == TAG_LATENCY_MARKER) {
 			target.writeLong(source.readLong());
 			target.writeInt(source.readInt());
@@ -168,6 +173,10 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			target.write(TAG_WATERMARK);
 			target.writeLong(value.asWatermark().getTimestamp());
 		}
+		else if (value.isStreamStatus()) {
+			target.write(TAG_STREAM_STATUS);
+			target.writeInt(value.asStreamStatus().getStatus());
+		}
 		else if (value.isLatencyMarker()) {
 			target.write(TAG_LATENCY_MARKER);
 			target.writeLong(value.asLatencyMarker().getMarkedTime());
@@ -192,6 +201,9 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 		else if (tag == TAG_WATERMARK) {
 			return new Watermark(source.readLong());
 		}
+		else if (tag == TAG_STREAM_STATUS) {
+			return new StreamStatus(source.readInt());
+		}
 		else if (tag == TAG_LATENCY_MARKER) {
 			return new LatencyMarker(source.readLong(), source.readInt(), source.readInt());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
new file mode 100644
index 0000000..f17d240
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code StatusWatermarkValve} embodies the logic of how {@link Watermark} and {@link StreamStatus} are propagated to
+ * downstream outputs, given a set of one or multiple input channels that continuously receive them. Usages of this
+ * class need to define the number of input channels that the valve needs to handle, as well as provide a customized
+ * implementation of {@link ValveOutputHandler}, which is called by the valve only when it determines a new watermark or
+ * stream status can be propagated.
+ */
+@Internal
+public class StatusWatermarkValve {
+
+	/**
+	 * Usages of {@code StatusWatermarkValve} should implement a {@code ValveOutputHandler}
+	 * to handle watermark and stream status outputs from the valve.
+	 */
+	public interface ValveOutputHandler {
+		void handleWatermark(Watermark watermark);
+		void handleStreamStatus(StreamStatus streamStatus);
+	}
+
+	private final ValveOutputHandler outputHandler;
+
+	// ------------------------------------------------------------------------
+	//	Runtime state for watermark & stream status output determination
+	// ------------------------------------------------------------------------
+
+	/** Array of current status of all input channels. Changes as watermarks & stream statuses are fed into the valve */
+	private final InputChannelStatus[] channelStatuses;
+
+	/** The last watermark emitted from the valve */
+	private long lastOutputWatermark;
+
+	/** The last stream status emitted from the valve */
+	private StreamStatus lastOutputStreamStatus;
+
+	/**
+	 * Returns a new {@code StatusWatermarkValve}.
+	 *
+	 * @param numInputChannels the number of input channels that this valve will need to handle
+	 * @param outputHandler the customized output handler for the valve
+	 */
+	public StatusWatermarkValve(int numInputChannels, ValveOutputHandler outputHandler) {
+		checkArgument(numInputChannels > 0);
+		this.channelStatuses = new InputChannelStatus[numInputChannels];
+		for (int i = 0; i < numInputChannels; i++) {
+			channelStatuses[i] = new InputChannelStatus();
+			channelStatuses[i].watermark = Long.MIN_VALUE;
+			channelStatuses[i].streamStatus = StreamStatus.ACTIVE;
+			channelStatuses[i].isWatermarkAligned = true;
+		}
+
+		this.outputHandler = checkNotNull(outputHandler);
+
+		this.lastOutputWatermark = Long.MIN_VALUE;
+		this.lastOutputStreamStatus = StreamStatus.ACTIVE;
+	}
+
+	/**
+	 * Feed a {@link Watermark} into the valve. If the input triggers the valve to output a new Watermark,
+	 * {@link ValveOutputHandler#handleWatermark(Watermark)} will be called to process the new Watermark.
+	 *
+	 * @param watermark the watermark to feed to the valve
+	 * @param channelIndex the index of the channel that the fed watermark belongs to (index starting from 0)
+	 */
+	public void inputWatermark(Watermark watermark, int channelIndex) {
+		// ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
+		if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
+			long watermarkMillis = watermark.getTimestamp();
+
+			// if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
+			if (watermarkMillis > channelStatuses[channelIndex].watermark) {
+				channelStatuses[channelIndex].watermark = watermarkMillis;
+
+				// previously unaligned input channels are now aligned if its watermark has caught up
+				if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
+					channelStatuses[channelIndex].isWatermarkAligned = true;
+				}
+
+				// now, attempt to find a new min watermark across all aligned channels
+				findAndOutputNewMinWatermarkAcrossAlignedChannels();
+			}
+		}
+	}
+
+	/**
+	 * Feed a {@link StreamStatus} into the valve. This may trigger the valve to output either a new Stream Status,
+	 * for which {@link ValveOutputHandler#handleStreamStatus(StreamStatus)} will be called, or a new Watermark,
+	 * for which {@link ValveOutputHandler#handleWatermark(Watermark)} will be called.
+	 *
+	 * @param streamStatus the stream status to feed to the valve
+	 * @param channelIndex the index of the channel that the fed stream status belongs to (index starting from 0)
+	 */
+	public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) {
+		// only account for stream status inputs that will result in a status change for the input channel
+		if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) {
+			// handle active -> idle toggle for the input channel
+			channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE;
+
+			// the channel is now idle, therefore not aligned
+			channelStatuses[channelIndex].isWatermarkAligned = false;
+
+			// if all input channels of the valve are now idle, we need to output an idle stream
+			// status from the valve (this also marks the valve as idle)
+			if (!InputChannelStatus.hasActiveChannels(channelStatuses)) {
+				lastOutputStreamStatus = StreamStatus.IDLE;
+				outputHandler.handleStreamStatus(lastOutputStreamStatus);
+			} else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
+				// if the watermark of the channel that just became idle equals the last output
+				// watermark (the previous overall min watermark), we may be able to find a new
+				// min watermark from the remaining aligned channels
+				findAndOutputNewMinWatermarkAcrossAlignedChannels();
+			}
+		} else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) {
+			// handle idle -> active toggle for the input channel
+			channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE;
+
+			// if the last watermark of the input channel, before it was marked idle, is still larger than
+			// the overall last output watermark of the valve, then we can set the channel to be aligned already.
+			if (channelStatuses[channelIndex].watermark >= lastOutputWatermark) {
+				channelStatuses[channelIndex].isWatermarkAligned = true;
+			}
+
+			// if the valve was previously marked to be idle, mark it as active and output an active stream
+			// status because at least one of the input channels is now active
+			if (lastOutputStreamStatus.isIdle()) {
+				lastOutputStreamStatus = StreamStatus.ACTIVE;
+				outputHandler.handleStreamStatus(lastOutputStreamStatus);
+			}
+		}
+	}
+
+	private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
+		long newMinWatermark = Long.MAX_VALUE;
+
+		// determine new overall watermark by considering only watermark-aligned channels across all channels
+		for (InputChannelStatus channelStatus : channelStatuses) {
+			if (channelStatus.isWatermarkAligned) {
+				newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
+			}
+		}
+
+		// we acknowledge and output the new overall watermark if it is larger than the last output watermark
+		if (newMinWatermark > lastOutputWatermark) {
+			lastOutputWatermark = newMinWatermark;
+			outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
+		}
+	}
+
+	/**
+	 * An {@code InputChannelStatus} keeps track of an input channel's last watermark, stream status, and whether or not
+	 * the channel's current watermark is aligned with the overall watermark output from the valve.
+	 *
+	 * There are 2 situations where a channel's watermark is not considered aligned:
+	 *  - the current stream status of the channel is idle
+	 *  - the stream status has resumed to be active, but the watermark of the channel hasn't caught up to the
+	 *    last output watermark from the valve yet.
+	 */
+	private static class InputChannelStatus {
+		private long watermark;
+		private StreamStatus streamStatus;
+		private boolean isWatermarkAligned;
+
+		/** Utility to check if at least one channel in a given array of input channels is active */
+		private static boolean hasActiveChannels(InputChannelStatus[] channelStatuses) {
+			for (InputChannelStatus status : channelStatuses) {
+				if (status.streamStatus.isActive()) {
+					return true;
+				}
+			}
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
new file mode 100644
index 0000000..e82fad0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * A Stream Status element informs stream tasks whether or not they should continue to expect records and watermarks
+ * from the input stream that sent them. There are 2 kinds of status, namely {@link StreamStatus#IDLE} and
+ * {@link StreamStatus#ACTIVE}. Stream Status elements are generated at the sources, and may be propagated through
+ * the tasks of the topology. They directly infer the current status of the emitting task; a {@link SourceStreamTask} or
+ * {@link StreamTask} emits a {@link StreamStatus#IDLE} if it will temporarily halt to emit any records or watermarks
+ * (i.e. is idle), and emits a {@link StreamStatus#ACTIVE} once it resumes to do so (i.e. is active). Tasks are
+ * responsible for propagating their status further downstream once they toggle between being idle and active. The cases
+ * that source tasks and downstream tasks are considered either idle or active is explained below:
+ *
+ * <ul>
+ *     <li>Source tasks: A source task is considered to be idle if its head operator, i.e. a {@link StreamSource}, will
+ *         not emit records for an indefinite amount of time. This is the case, for example, for Flink's Kafka Consumer,
+ *         where sources might initially have no assigned partitions to read from, or no records can be read from the
+ *         assigned partitions. Once the head {@link StreamSource} operator detects that it will resume emitting data,
+ *         the source task is considered to be active. {@link StreamSource}s are responsible for toggling the status
+ *         of the containing source task and ensuring that no records (and possibly watermarks, in the case of Flink's
+ *         Kafka Consumer which can generate watermarks directly within the source) will be emitted while the task is
+ *         idle. This guarantee should be enforced on sources through
+ *         {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext} implementations.</li>
+ *
+ *     <li>Downstream tasks: a downstream task is considered to be idle if all its input streams are idle, i.e. the last
+ *         received Stream Status element from all input streams is a {@link StreamStatus#IDLE}. As long as one of its
+ *         input streams is active, i.e. the last received Stream Status element from the input stream is
+ *         {@link StreamStatus#ACTIVE}, the task is active.</li>
+ * </ul>
+ *
+ * <p>
+ * Stream Status elements received at downstream tasks also affect and control how their operators process and advance
+ * their watermarks. The below describes the effects (the logic is implemented as a {@link StatusWatermarkValve} which
+ * downstream tasks should use for such purposes):
+ *
+ * <ul>
+ *     <li>Since source tasks guarantee that no records will be emitted between a {@link StreamStatus#IDLE} and
+ *         {@link StreamStatus#ACTIVE}, downstream tasks can always safely process and propagate records through their
+ *         operator chain when they receive them, without the need to check whether or not the task is currently idle or
+ *         active. However, for watermarks, since there may be watermark generators that might produce watermarks
+ *         anywhere in the middle of topologies regardless of whether there are input data at the operator, the current
+ *         status of the task must be checked before forwarding watermarks emitted from
+ *         an operator. It the status is actually idle, the watermark must be blocked.</li>
+ *
+ *     <li>For downstream tasks with multiple input streams, the watermarks of input streams that are temporarily idle,
+ *         or has resumed to be active but its watermark is behind the overall min watermark of the operator, should not
+ *         be accounted for when deciding whether or not to advance the watermark and propagated through the operator
+ *         chain.</li>
+ * </ul>
+ *
+ * <p>
+ * Note that to notify downstream tasks that a source task is permanently closed and will no longer send any more
+ * elements, the source should still send a {@link Watermark#MAX_WATERMARK} instead of {@link StreamStatus#IDLE}.
+ * Stream Status elements only serve as markers for temporary status.
+ */
+@Internal
+public final class StreamStatus extends StreamElement {
+
+	public static final int IDLE_STATUS = -1;
+	public static final int ACTIVE_STATUS = 0;
+
+	public static final StreamStatus IDLE = new StreamStatus(IDLE_STATUS);
+	public static final StreamStatus ACTIVE = new StreamStatus(ACTIVE_STATUS);
+
+	public final int status;
+
+	public StreamStatus(int status) {
+		if (status != IDLE_STATUS && status != ACTIVE_STATUS) {
+			throw new IllegalArgumentException("Invalid status value for StreamStatus; " +
+				"allowed values are " + ACTIVE_STATUS + " (for ACTIVE) and " + IDLE_STATUS + " (for IDLE).");
+		}
+
+		this.status = status;
+	}
+
+	public boolean isIdle() {
+		return this.status == IDLE_STATUS;
+	}
+
+	public boolean isActive() {
+		return !isIdle();
+	}
+
+	public int getStatus() {
+		return status;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		return this == o ||
+			o != null && o.getClass() == StreamStatus.class && ((StreamStatus) o).status == this.status;
+	}
+
+	@Override
+	public int hashCode() {
+		return status;
+	}
+
+	@Override
+	public String toString() {
+		String statusStr = (status == ACTIVE_STATUS) ? "ACTIVE" : "IDLE";
+		return "StreamStatus(" + statusStr + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java
new file mode 100644
index 0000000..ae8d9af
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for retrieving the current {@link StreamStatus}.
+ */
+@Internal
+public interface StreamStatusProvider {
+
+	/**
+	 * Returns the current stream status.
+	 *
+	 * @return current stream status.
+	 */
+	StreamStatus getStreamStatus();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 0f41103..e559ad0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -35,18 +35,27 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 	@Override
 	public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
-		
+
 		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
 		int numberOfInputs = configuration.getNumberOfInputs();
 
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
-			inputProcessor = new StreamInputProcessor<IN>(
-					inputGates, inSerializer,
-					this, 
+
+			@SuppressWarnings("unchecked")
+			OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain =
+					(OperatorChain) this.operatorChain;
+
+			inputProcessor = new StreamInputProcessor<>(
+					inputGates,
+					inSerializer,
+					this,
 					configuration.getCheckpointMode(),
+					getCheckpointLock(),
 					getEnvironment().getIOManager(),
-					getEnvironment().getTaskManagerInfo().getConfiguration());
+					getEnvironment().getTaskManagerInfo().getConfiguration(),
+					operatorChain,
+					this.headOperator);
 
 			// make sure that stream tasks report their I/O statistics
 			inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
@@ -55,12 +64,10 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	protected void run() throws Exception {
-		// cache some references on the stack, to make the code more JIT friendly
-		final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
+		// cache processor reference on the stack, to make the code more JIT friendly
 		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
-		final Object lock = getCheckpointLock();
-		
-		while (running && inputProcessor.processInput(operator, lock)) {
+
+		while (running && inputProcessor.processInput()) {
 			// all the work happens in the "processInput" method
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 6d01795..7e24eea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -30,18 +30,20 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +63,7 @@ import java.util.Random;
  *              head operator.
  */
 @Internal
-public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
+public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusProvider {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
 	
@@ -73,6 +75,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 
 	private final OP headOperator;
 
+	/**
+	 * Current status of the input stream of the operator chain.
+	 * Watermarks explicitly generated by operators in the chain (i.e. timestamp
+	 * assigner / watermark extractors), will be blocked and not forwarded if
+	 * this value is {@link StreamStatus#IDLE}.
+	 */
+	private StreamStatus streamStatus = StreamStatus.ACTIVE;
+
 	public OperatorChain(StreamTask<OUT, OP> containingTask) {
 		
 		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
@@ -110,7 +120,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 					chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
 
 			if (headOperator != null) {
-				headOperator.setup(containingTask, configuration, getChainEntryPoint());
+				Output output = getChainEntryPoint();
+				headOperator.setup(containingTask, configuration, output);
 			}
 
 			// add head operator to end of chain
@@ -135,6 +146,22 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		
 	}
 
+	@Override
+	public StreamStatus getStreamStatus() {
+		return streamStatus;
+	}
+
+	public void setStreamStatus(StreamStatus status) throws IOException {
+		if (!status.equals(this.streamStatus)) {
+			this.streamStatus = status;
+
+			// try and forward the stream status change to all outgoing connections
+			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+				streamOutput.emitStreamStatus(status);
+			}
+		}
+	}
+
 
 	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
 		try {
@@ -219,7 +246,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 	//  initialization utilities
 	// ------------------------------------------------------------------------
 	
-	private static <T> Output<StreamRecord<T>> createOutputCollector(
+	private <T> Output<StreamRecord<T>> createOutputCollector(
 			StreamTask<?, ?> containingTask,
 			StreamConfig operatorConfig,
 			Map<Integer, StreamConfig> chainedConfigs,
@@ -233,7 +260,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
 			@SuppressWarnings("unchecked")
 			RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
-			
+
 			allOutputs.add(new Tuple2<Output<StreamRecord<T>>, StreamEdge>(output, outputEdge));
 		}
 
@@ -270,9 +297,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 				// If the chaining output does not copy we need to copy in the broadcast output,
 				// otherwise multi-chaining would not work correctly.
 				if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-					return new CopyingBroadcastingOutputCollector<>(asArray);
+					return new CopyingBroadcastingOutputCollector<>(asArray, this);
 				} else  {
-					return new BroadcastingOutputCollector<>(asArray);
+					return new BroadcastingOutputCollector<>(asArray, this);
 				}
 			}
 		}
@@ -291,7 +318,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		}
 	}
 	
-	private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
+	private <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
 			StreamTask<?, ?> containingTask,
 			StreamConfig operatorConfig,
 			Map<Integer, StreamConfig> chainedConfigs,
@@ -310,15 +337,15 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		allOperators.add(chainedOperator);
 
 		if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-			return new ChainingOutput<>(chainedOperator);
+			return new ChainingOutput<>(chainedOperator, this);
 		}
 		else {
 			TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-			return new CopyingChainingOutput<>(chainedOperator, inSerializer);
+			return new CopyingChainingOutput<>(chainedOperator, inSerializer, this);
 		}
 	}
 	
-	private static <T> RecordWriterOutput<T> createStreamOutput(
+	private <T> RecordWriterOutput<T> createStreamOutput(
 			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
 			Environment taskEnvironment,
 			String taskName)
@@ -344,7 +371,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
 		output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
 		
-		return new RecordWriterOutput<>(output, outSerializer);
+		return new RecordWriterOutput<>(output, outSerializer, this);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -356,9 +383,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		protected final OneInputStreamOperator<T, ?> operator;
 		protected final Counter numRecordsIn;
 
-		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
+		protected final StreamStatusProvider streamStatusProvider;
+
+		public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) {
 			this.operator = operator;
 			this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+			this.streamStatusProvider = streamStatusProvider;
 		}
 
 		@Override
@@ -376,7 +406,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		@Override
 		public void emitWatermark(Watermark mark) {
 			try {
-				operator.processWatermark(mark);
+				if (streamStatusProvider.getStreamStatus().isActive()) {
+					operator.processWatermark(mark);
+				}
 			}
 			catch (Exception e) {
 				throw new ExceptionInChainedOperatorException(e);
@@ -408,8 +440,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		
 		private final TypeSerializer<T> serializer;
 		
-		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
-			super(operator);
+		public CopyingChainingOutput(
+				OneInputStreamOperator<T, ?> operator,
+				TypeSerializer<T> serializer,
+				StreamStatusProvider streamStatusProvider) {
+			super(operator, streamStatusProvider);
 			this.serializer = serializer;
 		}
 
@@ -432,15 +467,22 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		protected final Output<StreamRecord<T>>[] outputs;
 
 		private final Random RNG = new XORShiftRandom();
+
+		private final StreamStatusProvider streamStatusProvider;
 		
-		public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
+		public BroadcastingOutputCollector(
+				Output<StreamRecord<T>>[] outputs,
+				StreamStatusProvider streamStatusProvider) {
 			this.outputs = outputs;
+			this.streamStatusProvider = streamStatusProvider;
 		}
 
 		@Override
 		public void emitWatermark(Watermark mark) {
-			for (Output<StreamRecord<T>> output : outputs) {
-				output.emitWatermark(mark);
+			if (streamStatusProvider.getStreamStatus().isActive()) {
+				for (Output<StreamRecord<T>> output : outputs) {
+					output.emitWatermark(mark);
+				}
 			}
 		}
 
@@ -477,8 +519,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 	 */
 	private static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
 
-		public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
-			super(outputs);
+		public CopyingBroadcastingOutputCollector(
+				Output<StreamRecord<T>>[] outputs,
+				StreamStatusProvider streamStatusProvider) {
+			super(outputs, streamStatusProvider);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index a5f94ad..cdac11a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -38,8 +38,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 
 	@Override
 	public void init() throws Exception {
-		super.init();
-		
+
 		final String iterationId = getConfiguration().getIterationId();
 		if (iterationId == null || iterationId.length() == 0) {
 			throw new Exception("Missing iteration ID in the task configuration");
@@ -51,15 +50,18 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
 
 		LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
-		
+
 		@SuppressWarnings("unchecked")
 		BlockingQueue<StreamRecord<IN>> dataChannel =
 				(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
-		
+
 		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
-		
+
 		this.headOperator = new RecordPusher<>();
 		this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
+
+		// call super.init() last because that needs this.headOperator to be set up
+		super.init();
 	}
 
 	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 63843bb..2676b64 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -139,7 +139,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	protected OP headOperator;
 
 	/** The chain of operators executed by this task */
-	private OperatorChain<OUT, OP> operatorChain;
+	protected OperatorChain<OUT, OP> operatorChain;
 
 	/** The configuration of this streaming task */
 	private StreamConfig configuration;
@@ -637,7 +637,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		boolean restored = null != restoreStateHandles;
 
 		if (restored) {
-
 			checkRestorePreconditions(operatorChain.getChainLength());
 			initializeOperators(true);
 			restoreStateHandles = null; // free for GC

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 233e9f1..175bd76 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -64,14 +64,21 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 					throw new RuntimeException("Invalid input type number: " + inputType);
 			}
 		}
-	
-		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(
+
+		@SuppressWarnings("unchecked")
+		OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain =
+				(OperatorChain) this.operatorChain;
+
+		this.inputProcessor = new StreamTwoInputProcessor<>(
 				inputList1, inputList2,
 				inputDeserializer1, inputDeserializer2,
 				this,
 				configuration.getCheckpointMode(),
+				getCheckpointLock(),
 				getEnvironment().getIOManager(),
-				getEnvironment().getTaskManagerInfo().getConfiguration());
+				getEnvironment().getTaskManagerInfo().getConfiguration(),
+				operatorChain,
+				this.headOperator);
 
 		// make sure that stream tasks report their I/O statistics
 		inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
@@ -79,12 +86,10 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	protected void run() throws Exception {
-		// cache some references on the stack, to make the code more JIT friendly
-		final TwoInputStreamOperator<IN1, IN2, OUT> operator = this.headOperator;
+		// cache processor reference on the stack, to make the code more JIT friendly
 		final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
-		final Object lock = getCheckpointLock();
-		
-		while (running && inputProcessor.processInput(operator, lock)) {
+
+		while (running && inputProcessor.processInput()) {
 			// all the work happens in the "processInput" method
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index c2b0803..0255ee6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -374,6 +374,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
 		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
 				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.taskConfig = chainedVertex.getConfiguration();
 
@@ -484,6 +485,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
 		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
 				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
 			new LazyAsyncFunction(),
@@ -536,6 +538,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		final OneInputStreamTaskTestHarness<Integer, Integer> restoredTaskHarness =
 				new OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+		restoredTaskHarness.setupOutputForSingletonOperatorChain();
 
 		AsyncWaitOperator<Integer, Integer> restoredOperator = new AsyncWaitOperator<>(
 			new MyAsyncFunction(),

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
index 36cf53a..dafdeed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -33,6 +33,7 @@ public class StreamIterationHeadTest {
 		StreamIterationHead<Integer> head = new StreamIterationHead<>();
 		StreamTaskTestHarness<Integer> harness = new StreamTaskTestHarness<>(head,
 				BasicTypeInfo.INT_TYPE_INFO);
+		harness.setupOutputForSingletonOperatorChain();
 		harness.getStreamConfig().setIterationId("1");
 		harness.getStreamConfig().setIterationWaitTime(1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index e0e0e91..f41ec02 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -46,6 +46,7 @@ public class StreamTaskTimerTest {
 		
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
 				mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 		
@@ -83,6 +84,7 @@ public class StreamTaskTimerTest {
 		try {
 			final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
 			final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+			testHarness.setupOutputForSingletonOperatorChain();
 
 			StreamConfig streamConfig = testHarness.getStreamConfig();
 			StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 4d24b82..9897884 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -44,6 +44,7 @@ public class TestProcessingTimeServiceTest {
 
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
 			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index f84836b..9ddea8c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.streaming.runtime.operators;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Test;