You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/22 17:20:49 UTC

[2/2] flink git commit: [FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness

[FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness

This closes #3347.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0f0f372
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0f0f372
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0f0f372

Branch: refs/heads/master
Commit: b0f0f3722fac4726fba879736c7ee85993b392db
Parents: 646490c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Feb 17 02:43:44 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Feb 23 01:19:58 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   |   7 +-
 .../connectors/kafka/Kafka09FetcherTest.java    |   5 +
 .../AbstractFetcherTimestampsTest.java          |   6 +
 .../connectors/rabbitmq/RMQSourceTest.java      |   6 +
 .../flink/storm/wrappers/TestContext.java       |   7 +-
 .../hdfstests/ContinuousFileProcessingTest.java |   4 +
 .../source/ContinuousFileReaderOperator.java    |   8 +-
 .../api/functions/source/SourceFunction.java    |  14 +
 .../streaming/api/operators/StreamSource.java   |  22 +-
 .../api/operators/StreamSourceContexts.java     | 350 +++++++++++++++----
 .../runtime/io/StreamInputProcessor.java        |  10 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  20 +-
 .../streamstatus/StreamStatusMaintainer.java    |  36 ++
 .../runtime/tasks/OneInputStreamTask.java       |   6 +-
 .../streaming/runtime/tasks/OperatorChain.java  |   6 +-
 .../runtime/tasks/SourceStreamTask.java         |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |   5 +
 .../runtime/tasks/TwoInputStreamTask.java       |   6 +-
 .../api/functions/ListSourceContext.java        |   7 +-
 .../functions/StatefulSequenceSourceTest.java   |   6 +
 .../source/FileMonitoringFunctionTest.java      |   5 +-
 .../source/InputFormatSourceFunctionTest.java   |   5 +
 .../source/SocketTextStreamFunctionTest.java    |  11 +-
 .../AbstractUdfStreamOperatorLifecycleTest.java |   7 +-
 .../StreamSourceContextIdleDetectionTests.java  | 325 +++++++++++++++++
 .../operators/StreamSourceOperatorTest.java     |  25 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   5 +-
 .../util/AbstractStreamOperatorTestHarness.java |  19 +
 .../streaming/util/CollectingSourceContext.java |   5 +
 29 files changed, 814 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 3bc154e..5718986 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -447,7 +447,12 @@ public class Kafka010FetcherTest {
             block();
         }
 
-        @Override
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
         public Object getCheckpointLock() {
             return new Object();
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 4526aa0..abd75cc 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -447,6 +447,11 @@ public class Kafka09FetcherTest {
 		}
 
 		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public Object getCheckpointLock() {
 			return new Object();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index f2091f0..6887518 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -271,6 +271,12 @@ public class AbstractFetcherTimestampsTest {
 			}
 		}
 
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
 		@Override
 		public Object getCheckpointLock() {
 			return checkpointLock;

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 8474f8a..26434ed 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -443,6 +443,12 @@ public class RMQSourceTest {
 
 		@Override
 		public void emitWatermark(Watermark mark) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
index 4c4749a..58aad7b 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
@@ -41,7 +41,12 @@ class TestContext implements SourceContext<Tuple1<Integer>> {
 
 	@Override
 	public void emitWatermark(Watermark mark) {
-		// ignore it
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void markAsTemporarilyIdle() {
+		throw new UnsupportedOperationException();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index cc5cb8e..f579345 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -1001,6 +1001,10 @@ public class ContinuousFileProcessingTest {
 		}
 
 		@Override
+		public void markAsTemporarilyIdle() {
+		}
+
+		@Override
 		public Object getCheckpointLock() {
 			return lock;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index ab1ad1d..b86d97c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -136,7 +136,13 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
 		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 		this.readerContext = StreamSourceContexts.getSourceContext(
-			timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval);
+			timeCharacteristic,
+			getProcessingTimeService(),
+			checkpointLock,
+			getContainingTask().getStreamStatusMaintainer(),
+			output,
+			watermarkInterval,
+			-1);
 
 		// and initialize the split reading thread
 		this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index f1619b2..fc7f793 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -216,6 +216,20 @@ public interface SourceFunction<T> extends Function, Serializable {
 		@PublicEvolving
 		void emitWatermark(Watermark mark);
 
+		/**
+		 * Marks the source to be temporarily idle. This tells the system that this source will
+		 * temporarily stop emitting records and watermarks for an indefinite amount of time. This
+		 * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
+		 * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
+		 * watermarks without the need to wait for watermarks from this source while it is idle.
+		 *
+		 * <p>Source functions should make a best effort to call this method as soon as they
+		 * acknowledge themselves to be idle. The system will consider the source to resume activity
+		 * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
+		 * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
+		 */
+		@PublicEvolving
+		void markAsTemporarilyIdle();
 
 		/**
 		 * Returns the checkpoint lock. Please refer to the class-level comment in

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 84330b6..36f7c6a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
@@ -51,12 +52,15 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
-	public void run(final Object lockingObject) throws Exception {
-		run(lockingObject, output);
+	public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
+		run(lockingObject, streamStatusMaintainer, output);
 	}
 
 	
-	public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
+	public void run(final Object lockingObject,
+			final StreamStatusMaintainer streamStatusMaintainer,
+			final Output<StreamRecord<OUT>> collector) throws Exception {
+
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
 
 		LatencyMarksEmitter latencyEmitter = null;
@@ -68,11 +72,17 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 				getOperatorConfig().getVertexID(),
 				getRuntimeContext().getIndexOfThisSubtask());
 		}
-		
+
 		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 
 		this.ctx = StreamSourceContexts.getSourceContext(
-			timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval);
+			timeCharacteristic,
+			getProcessingTimeService(),
+			lockingObject,
+			streamStatusMaintainer,
+			collector,
+			watermarkInterval,
+			-1);
 
 		try {
 			userFunction.run(ctx);
@@ -108,7 +118,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	/**
 	 * Marks this source as canceled or stopped.
 	 * 
-	 * <p>This indicates that any exit of the {@link #run(Object, Output)} method
+	 * <p>This indicates that any exit of the {@link #run(Object, StreamStatusMaintainer, Output)} method
 	 * cannot be interpreted as the result of a finite source.  
 	 */
 	protected void markCanceledOrStopped() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index a6a273f..98281c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -42,16 +44,34 @@ public class StreamSourceContexts {
 	 * </ul>
 	 * */
 	public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
-			TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService,
-			Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) {
+			TimeCharacteristic timeCharacteristic,
+			ProcessingTimeService processingTimeService,
+			Object checkpointLock,
+			StreamStatusMaintainer streamStatusMaintainer,
+			Output<StreamRecord<OUT>> output,
+			long watermarkInterval,
+			long idleTimeout) {
 
 		final SourceFunction.SourceContext<OUT> ctx;
 		switch (timeCharacteristic) {
 			case EventTime:
-				ctx = new ManualWatermarkContext<>(checkpointLock, output);
+				ctx = new ManualWatermarkContext<>(
+					output,
+					processingTimeService,
+					checkpointLock,
+					streamStatusMaintainer,
+					idleTimeout);
+
 				break;
 			case IngestionTime:
-				ctx = new AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, watermarkInterval);
+				ctx = new AutomaticWatermarkContext<>(
+					output,
+					watermarkInterval,
+					processingTimeService,
+					checkpointLock,
+					streamStatusMaintainer,
+					idleTimeout);
+
 				break;
 			case ProcessingTime:
 				ctx = new NonTimestampContext<>(checkpointLock, output);
@@ -97,6 +117,11 @@ public class StreamSourceContexts {
 		}
 
 		@Override
+		public void markAsTemporarilyIdle() {
+			// do nothing
+		}
+
+		@Override
 		public Object getCheckpointLock() {
 			return lock;
 		}
@@ -109,10 +134,8 @@ public class StreamSourceContexts {
 	 * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
 	 * and watermark emission.
 	 */
-	private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+	private static class AutomaticWatermarkContext<T> extends WatermarkContext<T> {
 
-		private final ProcessingTimeService timeService;
-		private final Object lock;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
 
@@ -121,14 +144,18 @@ public class StreamSourceContexts {
 		private volatile ScheduledFuture<?> nextWatermarkTimer;
 		private volatile long nextWatermarkTime;
 
+		private long lastRecordTime;
+
 		private AutomaticWatermarkContext(
-			final ProcessingTimeService timeService,
-			final Object checkpointLock,
-			final Output<StreamRecord<T>> output,
-			final long watermarkInterval) {
+				final Output<StreamRecord<T>> output,
+				final long watermarkInterval,
+				final ProcessingTimeService timeService,
+				final Object checkpointLock,
+				final StreamStatusMaintainer streamStatusMaintainer,
+				final long idleTimeout) {
+
+			super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
 
-			this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
-			this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
 			this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
 
 			Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");
@@ -136,63 +163,62 @@ public class StreamSourceContexts {
 
 			this.reuse = new StreamRecord<>(null);
 
+			this.lastRecordTime = Long.MIN_VALUE;
+
 			long now = this.timeService.getCurrentProcessingTime();
 			this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
-				new WatermarkEmittingTask(this.timeService, lock, output));
+				new WatermarkEmittingTask(this.timeService, checkpointLock, output));
 		}
 
 		@Override
-		public void collect(T element) {
-			synchronized (lock) {
-				final long currentTime = this.timeService.getCurrentProcessingTime();
-				output.collect(reuse.replace(element, currentTime));
-
-				// this is to avoid lock contention in the lockingObject by
-				// sending the watermark before the firing of the watermark
-				// emission task.
-
-				if (currentTime > nextWatermarkTime) {
-					// in case we jumped some watermarks, recompute the next watermark time
-					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
-					nextWatermarkTime = watermarkTime + watermarkInterval;
-					output.emitWatermark(new Watermark(watermarkTime));
-
-					// we do not need to register another timer here
-					// because the emitting task will do so.
-				}
+		protected void processAndCollect(T element) {
+			lastRecordTime = this.timeService.getCurrentProcessingTime();
+			output.collect(reuse.replace(element, lastRecordTime));
+
+			// this is to avoid lock contention in the lockingObject by
+			// sending the watermark before the firing of the watermark
+			// emission task.
+			if (lastRecordTime > nextWatermarkTime) {
+				// in case we jumped some watermarks, recompute the next watermark time
+				final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval);
+				nextWatermarkTime = watermarkTime + watermarkInterval;
+				output.emitWatermark(new Watermark(watermarkTime));
+
+				// we do not need to register another timer here
+				// because the emitting task will do so.
 			}
 		}
 
 		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			collect(element);
+		protected void processAndCollectWithTimestamp(T element, long timestamp) {
+			processAndCollect(element);
 		}
 
 		@Override
-		public void emitWatermark(Watermark mark) {
-
-			if (mark.getTimestamp() == Long.MAX_VALUE) {
-				// allow it since this is the special end-watermark that for example the Kafka source emits
-				synchronized (lock) {
-					nextWatermarkTime = Long.MAX_VALUE;
-					output.emitWatermark(mark);
-				}
-
-				// we can shutdown the timer now, no watermarks will be needed any more
-				final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
-				if (nextWatermarkTimer != null) {
-					nextWatermarkTimer.cancel(true);
-				}
-			}
+		protected boolean allowWatermark(Watermark mark) {
+			// allow Long.MAX_VALUE since this is the special end-watermark that for example the Kafka source emits
+			return mark.getTimestamp() == Long.MAX_VALUE && nextWatermarkTime != Long.MAX_VALUE;
 		}
 
+		/** This will only be called if allowWatermark returned {@code true} */
 		@Override
-		public Object getCheckpointLock() {
-			return lock;
+		protected void processAndEmitWatermark(Watermark mark) {
+			nextWatermarkTime = Long.MAX_VALUE;
+			output.emitWatermark(mark);
+
+			// we can shutdown the watermark timer now, no watermarks will be needed any more.
+			// Note that this procedure actually doesn't need to be synchronized with the lock,
+			// but since it's only a one-time thing, doesn't hurt either
+			final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
+			if (nextWatermarkTimer != null) {
+				nextWatermarkTimer.cancel(true);
+			}
 		}
 
 		@Override
 		public void close() {
+			super.close();
+
 			final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
 			if (nextWatermarkTimer != null) {
 				nextWatermarkTimer.cancel(true);
@@ -218,14 +244,23 @@ public class StreamSourceContexts {
 			public void onProcessingTime(long timestamp) {
 				final long currentTime = timeService.getCurrentProcessingTime();
 
-				if (currentTime > nextWatermarkTime) {
-					// align the watermarks across all machines. this will ensure that we
-					// don't have watermarks that creep along at different intervals because
-					// the machine clocks are out of sync
-					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+				synchronized (lock) {
+					// we should continue to automatically emit watermarks if we are active
+					if (streamStatusMaintainer.getStreamStatus().isActive()) {
+						if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout) {
+							// if we are configured to detect idleness, piggy-back the idle detection check on the
+							// watermark interval, so that we may possibly discover idle sources faster before waiting
+							// for the next idle check to fire
+							markAsTemporarilyIdle();
+
+							// no need to finish the next check, as we are now idle.
+							cancelNextIdleDetectionTask();
+						} else if (currentTime > nextWatermarkTime) {
+							// align the watermarks across all machines. this will ensure that we
+							// don't have watermarks that creep along at different intervals because
+							// the machine clocks are out of sync
+							final long watermarkTime = currentTime - (currentTime % watermarkInterval);
 
-					synchronized (lock) {
-						if (currentTime > nextWatermarkTime) {
 							output.emitWatermark(new Watermark(watermarkTime));
 							nextWatermarkTime = watermarkTime + watermarkInterval;
 						}
@@ -247,45 +282,220 @@ public class StreamSourceContexts {
 	 * Streaming topologies can use timestamp assigner functions to override the timestamps
 	 * assigned here.
 	 */
-	private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+	private static class ManualWatermarkContext<T> extends WatermarkContext<T> {
 
-		private final Object lock;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
 
-		private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) {
-			this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+		private ManualWatermarkContext(
+				final Output<StreamRecord<T>> output,
+				final ProcessingTimeService timeService,
+				final Object checkpointLock,
+				final StreamStatusMaintainer streamStatusMaintainer,
+				final long idleTimeout) {
+
+			super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
+
 			this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
 			this.reuse = new StreamRecord<>(null);
 		}
 
 		@Override
+		protected void processAndCollect(T element) {
+			output.collect(reuse.replace(element));
+		}
+
+		@Override
+		protected void processAndCollectWithTimestamp(T element, long timestamp) {
+			output.collect(reuse.replace(element, timestamp));
+		}
+
+		@Override
+		protected void processAndEmitWatermark(Watermark mark) {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		protected boolean allowWatermark(Watermark mark) {
+			return true;
+		}
+	}
+
+	/**
+	 * An abstract {@link SourceFunction.SourceContext} that should be used as the base for
+	 * stream source contexts that are relevant with {@link Watermark}s.
+	 *
+	 * Stream source contexts that are relevant with watermarks are responsible of manipulating
+	 * the current {@link StreamStatus}, so that stream status can be correctly propagated
+	 * downstream. Please refer to the class-level documentation of {@link StreamStatus} for
+	 * information on how stream status affects watermark advancement at downstream tasks.
+	 *
+	 * This class implements the logic of idleness detection. It fires idleness detection
+	 * tasks at a given interval; if no records or watermarks were collected by the source context
+	 * between 2 consecutive checks, it determines the source to be IDLE and correspondingly
+	 * toggles the status. ACTIVE status resumes as soon as some record or watermark is collected
+	 * again.
+	 */
+	private static abstract class WatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+		protected final ProcessingTimeService timeService;
+		protected final Object checkpointLock;
+		protected final StreamStatusMaintainer streamStatusMaintainer;
+		protected final long idleTimeout;
+
+		private ScheduledFuture<?> nextCheck;
+
+		/**
+		 * This flag will be reset to {@code true} every time the next check is scheduled.
+		 * Whenever a record or watermark is collected, the flag will be set to {@code false}.
+		 *
+		 * When the scheduled check is fired, if the flag remains to be {@code true}, the check will fail,
+		 * and our current status will determined to be IDLE.
+		 */
+		private volatile boolean failOnNextCheck;
+
+		/**
+		 * Create a watermark context.
+		 *
+		 * @param timeService the time service to schedule idleness detection tasks
+		 * @param checkpointLock the checkpoint lock
+		 * @param streamStatusMaintainer the stream status maintainer to toggle and retrieve current status
+		 * @param idleTimeout (-1 if idleness checking is disabled)
+		 */
+		public WatermarkContext(
+				final ProcessingTimeService timeService,
+				final Object checkpointLock,
+				final StreamStatusMaintainer streamStatusMaintainer,
+				final long idleTimeout) {
+
+			this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+			this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null.");
+			this.streamStatusMaintainer = Preconditions.checkNotNull(streamStatusMaintainer, "Stream Status Maintainer cannot be null.");
+
+			if (idleTimeout != -1) {
+				Preconditions.checkArgument(idleTimeout >= 1, "The idle timeout cannot be smaller than 1 ms.");
+			}
+			this.idleTimeout = idleTimeout;
+
+			scheduleNextIdleDetectionTask();
+		}
+
+		@Override
 		public void collect(T element) {
-			synchronized (lock) {
-				output.collect(reuse.replace(element));
+			synchronized (checkpointLock) {
+				streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+				if (nextCheck != null) {
+					this.failOnNextCheck = false;
+				} else {
+					scheduleNextIdleDetectionTask();
+				}
+
+				processAndCollect(element);
 			}
 		}
 
 		@Override
 		public void collectWithTimestamp(T element, long timestamp) {
-			synchronized (lock) {
-				output.collect(reuse.replace(element, timestamp));
+			synchronized (checkpointLock) {
+				streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+				if (nextCheck != null) {
+					this.failOnNextCheck = false;
+				} else {
+					scheduleNextIdleDetectionTask();
+				}
+
+				processAndCollectWithTimestamp(element, timestamp);
 			}
 		}
 
 		@Override
 		public void emitWatermark(Watermark mark) {
-			synchronized (lock) {
-				output.emitWatermark(mark);
+			if (allowWatermark(mark)) {
+				synchronized (checkpointLock) {
+					streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+					if (nextCheck != null) {
+						this.failOnNextCheck = false;
+					} else {
+						scheduleNextIdleDetectionTask();
+					}
+
+					processAndEmitWatermark(mark);
+				}
+			}
+		}
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			synchronized (checkpointLock) {
+				streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
 			}
 		}
 
 		@Override
 		public Object getCheckpointLock() {
-			return lock;
+			return checkpointLock;
 		}
 
 		@Override
-		public void close() {}
+		public void close() {
+			cancelNextIdleDetectionTask();
+		}
+
+		private class IdlenessDetectionTask implements ProcessingTimeCallback {
+			@Override
+			public void onProcessingTime(long timestamp) throws Exception {
+				synchronized (checkpointLock) {
+					// set this to null now;
+					// the next idleness detection will be scheduled again
+					// depending on the below failOnNextCheck condition
+					nextCheck = null;
+
+					if (failOnNextCheck) {
+						markAsTemporarilyIdle();
+					} else {
+						scheduleNextIdleDetectionTask();
+					}
+				}
+			}
+		}
+
+		private void scheduleNextIdleDetectionTask() {
+			if (idleTimeout != -1) {
+				// reset flag; if it remains true when task fires, we have detected idleness
+				failOnNextCheck = true;
+				nextCheck = this.timeService.registerTimer(
+					this.timeService.getCurrentProcessingTime() + idleTimeout,
+					new IdlenessDetectionTask());
+			}
+		}
+
+		protected void cancelNextIdleDetectionTask() {
+			final ScheduledFuture<?> nextCheck = this.nextCheck;
+			if (nextCheck != null) {
+				nextCheck.cancel(true);
+			}
+		}
+
+		// ------------------------------------------------------------------------
+		//	Abstract methods for concrete subclasses to implement.
+		//  These methods are guaranteed to be synchronized on the checkpoint lock,
+		//  so implementations don't need to do so.
+		// ------------------------------------------------------------------------
+
+		/** Process and collect record. */
+		protected abstract void processAndCollect(T element);
+
+		/** Process and collect record with timestamp. */
+		protected abstract void processAndCollectWithTimestamp(T element, long timestamp);
+
+		/** Whether or not a watermark should be allowed */
+		protected abstract boolean allowWatermark(Watermark mark);
+
+		/** Process and emit watermark. Only called if {@link WatermarkContext#allowWatermark(Watermark)} returns {@code true} */
+		protected abstract void processAndEmitWatermark(Watermark mark);
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index e2061c3..3feaa52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -49,7 +49,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -95,7 +95,7 @@ public class StreamInputProcessor<IN> {
 	 */
 	private int currentChannel = -1;
 
-	private final OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain;
+	private final StreamStatusMaintainer streamStatusMaintainer;
 	
 	private final OneInputStreamOperator<IN, ?> streamOperator;
 
@@ -115,7 +115,7 @@ public class StreamInputProcessor<IN> {
 			Object lock,
 			IOManager ioManager,
 			Configuration taskManagerConfig,
-			OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain,
+			StreamStatusMaintainer streamStatusMaintainer,
 			OneInputStreamOperator<IN, ?> streamOperator) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
@@ -157,7 +157,7 @@ public class StreamInputProcessor<IN> {
 
 		this.lastEmittedWatermark = Long.MIN_VALUE;
 
-		this.operatorChain = checkNotNull(operatorChain);
+		this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
 		this.streamOperator = checkNotNull(streamOperator);
 
 		this.statusWatermarkValve = new StatusWatermarkValve(
@@ -297,7 +297,7 @@ public class StreamInputProcessor<IN> {
 		public void handleStreamStatus(StreamStatus streamStatus) {
 			try {
 				synchronized (lock) {
-					operatorChain.setStreamStatus(streamStatus);
+					streamStatusMaintainer.toggleStreamStatus(streamStatus);
 				}
 			} catch (Exception e) {
 				throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a295395..a8ec972 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -45,7 +45,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -107,7 +107,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 	 */
 	private int currentChannel = -1;
 
-	private final OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain;
+	private final StreamStatusMaintainer streamStatusMaintainer;
 
 	private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
 
@@ -129,7 +129,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			Object lock,
 			IOManager ioManager,
 			Configuration taskManagerConfig,
-			OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain,
+			StreamStatusMaintainer streamStatusMaintainer,
 			TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws IOException {
 
 		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
@@ -185,7 +185,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		this.firstStatus = StreamStatus.ACTIVE;
 		this.secondStatus = StreamStatus.ACTIVE;
 
-		this.operatorChain = checkNotNull(operatorChain);
+		this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
 		this.streamOperator = checkNotNull(streamOperator);
 
 		this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
@@ -355,13 +355,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					firstStatus = streamStatus;
 
 					// check if we need to toggle the task's stream status
-					if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+					if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
 						if (streamStatus.isActive()) {
 							// we're no longer idle if at least one input has become active
-							operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+							streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
 						} else if (secondStatus.isIdle()) {
 							// we're idle once both inputs are idle
-							operatorChain.setStreamStatus(StreamStatus.IDLE);
+							streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
 						}
 					}
 				}
@@ -399,13 +399,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					secondStatus = streamStatus;
 
 					// check if we need to toggle the task's stream status
-					if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+					if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
 						if (streamStatus.isActive()) {
 							// we're no longer idle if at least one input has become active
-							operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+							streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
 						} else if (firstStatus.isIdle()) {
 							// we're idle once both inputs are idle
-							operatorChain.setStreamStatus(StreamStatus.IDLE);
+							streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
new file mode 100644
index 0000000..d964cef
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface that allows toggling the current {@link StreamStatus} as well as retrieving it.
+ */
+@Internal
+public interface StreamStatusMaintainer extends StreamStatusProvider {
+
+	/**
+	 * Toggles the current stream status. This method should only have effect
+	 * if the supplied stream status is different from the current status.
+	 *
+	 * @param streamStatus the new status to toggle to
+	 */
+	void toggleStreamStatus(StreamStatus streamStatus);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index e559ad0..e04d316 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -42,10 +42,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
 
-			@SuppressWarnings("unchecked")
-			OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain =
-					(OperatorChain) this.operatorChain;
-
 			inputProcessor = new StreamInputProcessor<>(
 					inputGates,
 					inSerializer,
@@ -54,7 +50,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 					getCheckpointLock(),
 					getEnvironment().getIOManager(),
 					getEnvironment().getTaskManagerInfo().getConfiguration(),
-					operatorChain,
+					getStreamStatusMaintainer(),
 					this.headOperator);
 
 			// make sure that stream tasks report their I/O statistics

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 591ed3c..4f07182 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
@@ -63,7 +64,7 @@ import java.util.Random;
  *              head operator.
  */
 @Internal
-public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusProvider {
+public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
 	
@@ -151,7 +152,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		return streamStatus;
 	}
 
-	public void setStreamStatus(StreamStatus status) throws IOException {
+	@Override
+	public void toggleStreamStatus(StreamStatus status) {
 		if (!status.equals(this.streamStatus)) {
 			this.streamStatus = status;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 7ae99f6..63b40ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	@Override
 	protected void run() throws Exception {
-		headOperator.run(getCheckpointLock());
+		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 60afd60..62cfb8f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -57,6 +57,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
@@ -497,6 +498,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return accumulatorMap;
 	}
 
+	public StreamStatusMaintainer getStreamStatusMaintainer() {
+		return operatorChain;
+	}
+
 	Output<StreamRecord<OUT>> getHeadOutput() {
 		return operatorChain.getChainEntryPoint();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 175bd76..71346b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -65,10 +65,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 			}
 		}
 
-		@SuppressWarnings("unchecked")
-		OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain =
-				(OperatorChain) this.operatorChain;
-
 		this.inputProcessor = new StreamTwoInputProcessor<>(
 				inputList1, inputList2,
 				inputDeserializer1, inputDeserializer2,
@@ -77,7 +73,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 				getCheckpointLock(),
 				getEnvironment().getIOManager(),
 				getEnvironment().getTaskManagerInfo().getConfiguration(),
-				operatorChain,
+				getStreamStatusMaintainer(),
 				this.headOperator);
 
 		// make sure that stream tasks report their I/O statistics

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index e4dadf0..a4c1bea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -67,7 +67,12 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
 
 	@Override
 	public void emitWatermark(Watermark mark) {
-		// don't do anything
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void markAsTemporarilyIdle() {
+		throw new UnsupportedOperationException();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
index 8332cb3..9030e9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
@@ -228,6 +228,12 @@ public class StatefulSequenceSourceTest {
 
 		@Override
 		public void emitWatermark(Watermark mark) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
index 6b36419..d81b440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
@@ -58,10 +58,13 @@ public class FileMonitoringFunctionTest {
 					public void emitWatermark(Watermark mark) {}
 
 					@Override
+					public void markAsTemporarilyIdle() {}
+
+					@Override
 					public Object getCheckpointLock() { return null; }
 
 					@Override
 					public void close() {}
 				});
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index d1131b4..bb80228 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -225,6 +225,11 @@ public class InputFormatSourceFunctionTest {
 		}
 
 		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public Object getCheckpointLock() {
 			return null;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
index 3e274cf..87376e7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -283,7 +283,14 @@ public class SocketTextStreamFunctionTest {
 					}
 
 					@Override
-					public void emitWatermark(Watermark mark) {}
+					public void emitWatermark(Watermark mark) {
+						throw new UnsupportedOperationException();
+					}
+
+					@Override
+					public void markAsTemporarilyIdle() {
+						throw new UnsupportedOperationException();
+					}
 
 					@Override
 					public Object getCheckpointLock() {
@@ -346,4 +353,4 @@ public class SocketTextStreamFunctionTest {
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 357163c..c4ddea8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
@@ -219,9 +220,11 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		}
 
 		@Override
-		public void run(Object lockingObject, Output<StreamRecord<OUT>> collector) throws Exception {
+		public void run(Object lockingObject,
+						StreamStatusMaintainer streamStatusMaintainer,
+						Output<StreamRecord<OUT>> collector) throws Exception {
 			ACTUAL_ORDER_TRACKING.add("OPERATOR::run");
-			super.run(lockingObject, collector);
+			super.run(lockingObject, streamStatusMaintainer, collector);
 			runStarted.trigger();
 			runFinish.await();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
new file mode 100644
index 0000000..3695120
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class StreamSourceContextIdleDetectionTests {
+
+	/** The tests in this class will be parameterized with these enumerations.*/
+	private enum TestMethod {
+
+		/** test idleness detection using the {@link SourceFunction.SourceContext#collect(Object)} method */
+		COLLECT,
+
+		/** test idleness detection using the {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method */
+		COLLECT_WITH_TIMESTAMP,
+
+		/** test idleness detection using the {@link SourceFunction.SourceContext#emitWatermark(Watermark)} method */
+		EMIT_WATERMARK
+	}
+
+	private TestMethod testMethod;
+
+	public StreamSourceContextIdleDetectionTests(TestMethod testMethod) {
+		this.testMethod = testMethod;
+	}
+
+	/**
+	 * Test scenario (idleTimeout = 100):
+	 * (1) Start from 0 as initial time.
+	 * (2) As soon as time reaches 100, status should have been toggled to IDLE.
+	 * (3) After some arbitrary time (until 300), the status should remain IDLE.
+	 * (4) Emit a record at 310. Status should become ACTIVE. This should fire a idleness detection at 410.
+	 * (5) Emit another record at 320 (which is before the next check). This should make the idleness check pass.
+	 * (6) Advance time to 410 and trigger idleness detection.
+	 *     The status should still be ACTIVE due to step (5). Another idleness detection should be fired at 510.
+	 * (7) Advance time to 510 and trigger idleness detection. Since no records were collected in-between the two
+	 *     idleness detections, status should have been toggle back to IDLE.
+	 *
+	 * Inline comments will refer to the corresponding tested steps in the scenario.
+	 */
+	@Test
+	public void testManualWatermarkContext() throws Exception {
+		long idleTimeout = 100;
+
+		long initialTime = 0;
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		processingTimeService.setCurrentTime(initialTime);
+
+		final List<StreamElement> output = new ArrayList<>();
+
+		MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer();
+
+		SourceFunction.SourceContext<String> context = StreamSourceContexts.getSourceContext(
+			TimeCharacteristic.EventTime,
+			processingTimeService,
+			new Object(),
+			mockStreamStatusMaintainer,
+			new CollectorOutput<String>(output),
+			0,
+			idleTimeout);
+
+		// -------------------------- begin test scenario --------------------------
+
+		// corresponds to step (2) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + idleTimeout);
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+
+		// corresponds to step (3) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 2*idleTimeout);
+		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout);
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+
+		// corresponds to step (4) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10);
+		switch (testMethod) {
+			case COLLECT:
+				context.collect("msg");
+				break;
+			case COLLECT_WITH_TIMESTAMP:
+				context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+				break;
+			case EMIT_WATERMARK:
+				context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+				break;
+		}
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+		// corresponds to step (5) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 2*idleTimeout/10);
+		switch (testMethod) {
+			case COLLECT:
+				context.collect("msg");
+				break;
+			case COLLECT_WITH_TIMESTAMP:
+				context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+				break;
+			case EMIT_WATERMARK:
+				context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+				break;
+		}
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+		// corresponds to step (6) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10);
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+		// corresponds to step (7) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 5*idleTimeout + idleTimeout/10);
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+	}
+
+	/**
+	 * Test scenario (idleTimeout = 100, watermarkInterval = 40):
+	 * (1) Start from 20 as initial time.
+	 * (2) As soon as time reaches 120, status should have been toggled to IDLE.
+	 * (3) After some arbitrary time (until 320), the status should remain IDLE, and no watermarks should have been emitted.
+	 * (4) Emit a record at 330. Status should become ACTIVE. This should schedule a idleness detection to be fired at 430.
+	 * (5) Emit another record at 350 (which is before the next check). This should make the idleness check pass.
+	 * (6) Advance time to 430 and trigger idleness detection. The status should still be ACTIVE due to step (5).
+	 *     This should schedule a idleness detection to be fired at 530.
+	 * (7) Advance time to 460, in which a watermark emission task should be fired. Idleness detection
+	 *     should have been "piggy-backed" in the task, allowing the status to be toggled to IDLE before the next
+	 *     actual idle detection task at 530.
+	 *
+	 * Inline comments will refer to the corresponding tested steps in the scenario.
+	 */
+	@Test
+	public void testAutomaticWatermarkContext() throws Exception {
+		long watermarkInterval = 40;
+		long idleTimeout = 100;
+		long initialTime = 20;
+
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		processingTimeService.setCurrentTime(initialTime);
+
+		MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer();
+
+		final List<StreamElement> output = new ArrayList<>();
+		final List<StreamElement> expectedOutput = new ArrayList<>();
+
+		SourceFunction.SourceContext<String> context = StreamSourceContexts.getSourceContext(
+			TimeCharacteristic.IngestionTime,
+			processingTimeService,
+			new Object(),
+			mockStreamStatusMaintainer,
+			new CollectorOutput<String>(output),
+			watermarkInterval,
+			idleTimeout);
+
+		// -------------------------- begin test scenario --------------------------
+
+		// corresponds to step (2) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + watermarkInterval);
+		expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+		processingTimeService.setCurrentTime(initialTime + 2*watermarkInterval);
+		expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+		processingTimeService.setCurrentTime(initialTime + idleTimeout);
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+		assertEquals(expectedOutput, output);
+
+		// corresponds to step (3) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 3*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 4*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 2*idleTimeout);
+		processingTimeService.setCurrentTime(initialTime + 6*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 7*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout);
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+		assertEquals(expectedOutput, output);
+
+		// corresponds to step (4) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10);
+		switch (testMethod) {
+			case COLLECT:
+				context.collect("msg");
+				expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+				expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+				assertEquals(expectedOutput, output);
+				break;
+			case COLLECT_WITH_TIMESTAMP:
+				context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+				expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+				expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+				assertEquals(expectedOutput, output);
+				break;
+			case EMIT_WATERMARK:
+				// for emitWatermark, since the watermark will be blocked,
+				// it should not make the status become active;
+				// from here on, the status should remain idle for the emitWatermark variant test
+				context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+				assertEquals(expectedOutput, output);
+		}
+
+		// corresponds to step (5) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 8*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 3*idleTimeout/10);
+		switch (testMethod) {
+			case COLLECT:
+				context.collect("msg");
+				expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+				assertEquals(expectedOutput, output);
+				break;
+			case COLLECT_WITH_TIMESTAMP:
+				context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+				expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+				assertEquals(expectedOutput, output);
+				break;
+			case EMIT_WATERMARK:
+				context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+				assertEquals(expectedOutput, output);
+		}
+
+		processingTimeService.setCurrentTime(initialTime + 9 * watermarkInterval);
+		switch (testMethod) {
+			case COLLECT:
+			case COLLECT_WITH_TIMESTAMP:
+				expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+				assertEquals(expectedOutput, output);
+				break;
+			case EMIT_WATERMARK:
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+				assertEquals(expectedOutput, output);
+		}
+
+		processingTimeService.setCurrentTime(initialTime + 10*watermarkInterval);
+		switch (testMethod) {
+			case COLLECT:
+			case COLLECT_WITH_TIMESTAMP:
+				expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+				assertEquals(expectedOutput, output);
+				break;
+			case EMIT_WATERMARK:
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+				assertEquals(expectedOutput, output);
+		}
+
+		// corresponds to step (6) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10);
+		switch (testMethod) {
+			case COLLECT:
+			case COLLECT_WITH_TIMESTAMP:
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+				assertEquals(expectedOutput, output);
+				break;
+			case EMIT_WATERMARK:
+				assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+				assertEquals(expectedOutput, output);
+		}
+
+		// corresponds to step (7) of scenario (please see method-level Javadoc comment)
+		processingTimeService.setCurrentTime(initialTime + 11*watermarkInterval);
+		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+		assertEquals(expectedOutput, output);
+	}
+
+	private static class MockStreamStatusMaintainer implements StreamStatusMaintainer {
+		StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+		@Override
+		public void toggleStreamStatus(StreamStatus streamStatus) {
+			if (!currentStreamStatus.equals(streamStatus)) {
+				currentStreamStatus = streamStatus;
+			}
+		}
+
+		@Override
+		public StreamStatus getStreamStatus() {
+			return currentStreamStatus;
+		}
+	}
+
+	@Parameterized.Parameters(name = "TestMethod = {0}")
+	@SuppressWarnings("unchecked")
+	public static Collection<TestMethod[]> timeCharacteristic(){
+		return Arrays.asList(
+			new TestMethod[]{TestMethod.COLLECT},
+			new TestMethod[]{TestMethod.COLLECT_WITH_TIMESTAMP},
+			new TestMethod[]{TestMethod.EMIT_WATERMARK});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index b153de9..ae74c9c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -35,6 +36,8 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -69,7 +72,7 @@ public class StreamSourceOperatorTest {
 		final List<StreamElement> output = new ArrayList<>();
 		
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
-		operator.run(new Object(), new CollectorOutput<String>(output));
+		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
 		
 		assertEquals(1, output.size());
 		assertEquals(Watermark.MAX_WATERMARK, output.get(0));
@@ -89,7 +92,7 @@ public class StreamSourceOperatorTest {
 		operator.cancel();
 
 		// run and exit
-		operator.run(new Object(), new CollectorOutput<String>(output));
+		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
 		
 		assertTrue(output.isEmpty());
 	}
@@ -121,7 +124,7 @@ public class StreamSourceOperatorTest {
 		
 		// run and wait to be canceled
 		try {
-			operator.run(new Object(), new CollectorOutput<String>(output));
+			operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
 		}
 		catch (InterruptedException ignored) {}
 
@@ -142,7 +145,7 @@ public class StreamSourceOperatorTest {
 		operator.stop();
 
 		// run and stop
-		operator.run(new Object(), new CollectorOutput<String>(output));
+		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
 
 		assertTrue(output.isEmpty());
 	}
@@ -171,7 +174,7 @@ public class StreamSourceOperatorTest {
 		}.start();
 
 		// run and wait to be stopped
-		operator.run(new Object(), new CollectorOutput<String>(output));
+		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
 
 		assertTrue(output.isEmpty());
 	}
@@ -198,7 +201,7 @@ public class StreamSourceOperatorTest {
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService);
 
 		// run and wait to be stopped
-		operator.run(new Object(), new CollectorOutput<Long>(output));
+		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output));
 
 		int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1;
 
@@ -224,11 +227,6 @@ public class StreamSourceOperatorTest {
 	}
 
 	@Test
-	public void testLatencyMarksEmitterLifecycleIntegration() {
-
-	}
-
-	@Test
 	public void testAutomaticWatermarkContext() throws Exception {
 
 		// regular stream source operator
@@ -246,8 +244,10 @@ public class StreamSourceOperatorTest {
 		StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
 			operator.getContainingTask().getProcessingTimeService(),
 			operator.getContainingTask().getCheckpointLock(),
+			operator.getContainingTask().getStreamStatusMaintainer(),
 			new CollectorOutput<String>(output),
-			operator.getExecutionConfig().getAutoWatermarkInterval());
+			operator.getExecutionConfig().getAutoWatermarkInterval(),
+			-1);
 
 		// periodically emit the watermarks
 		// even though we start from 1 the watermark are still
@@ -302,6 +302,7 @@ public class StreamSourceOperatorTest {
 		when(mockTask.getEnvironment()).thenReturn(env);
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+		when(mockTask.getStreamStatusMaintainer()).thenReturn(mock(StreamStatusMaintainer.class));
 
 		doAnswer(new Answer<ProcessingTimeService>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d33d1b6..1e74c3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -84,6 +84,7 @@ import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -759,7 +760,9 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		public void run(Object lockingObject, Output<StreamRecord<Long>> collector) throws Exception {
+		public void run(Object lockingObject,
+						StreamStatusMaintainer streamStatusMaintainer,
+						Output<StreamRecord<Long>> collector) throws Exception {
 			while (!canceled) {
 				try {
 					Thread.sleep(500);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2df4efd..01afec6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -54,6 +54,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -157,6 +159,22 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		processingTimeService = new TestProcessingTimeService();
 		processingTimeService.setCurrentTime(0);
 
+		StreamStatusMaintainer mockStreamStatusMaintainer = new StreamStatusMaintainer() {
+			StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+			@Override
+			public void toggleStreamStatus(StreamStatus streamStatus) {
+				if (!currentStreamStatus.equals(streamStatus)) {
+					currentStreamStatus = streamStatus;
+				}
+			}
+
+			@Override
+			public StreamStatus getStreamStatus() {
+				return currentStreamStatus;
+			}
+		};
+
 		when(mockTask.getName()).thenReturn("Mock Task");
 		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(config);
@@ -165,6 +183,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
 		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
+		when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
 
 		doAnswer(new Answer<Void>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
index fe2b03e..d9ad24d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
@@ -56,6 +56,11 @@ public class CollectingSourceContext<T extends Serializable> implements SourceFu
 	}
 
 	@Override
+	public void markAsTemporarilyIdle() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public Object getCheckpointLock() {
 		return lock;
 	}