You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:16:52 UTC

[02/17] flink git commit: [hotfix] Various code cleanups around time service and asynchronous exceptions

[hotfix] Various code cleanups around time service and asynchronous exceptions

  - DefaultTimeServiceProvider now owns scheduled executor
  - Enforce that an asynchronous exception handler is always set


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954ef08f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954ef08f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954ef08f

Branch: refs/heads/master
Commit: 954ef08f374d7e7c1f2b469201b1ea05c6376b33
Parents: 8ff451b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 4 16:15:05 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../AbstractFetcherTimestampsTest.java          | 122 +++++++++++--------
 .../kafka/testutils/MockRuntimeContext.java     |  40 +++---
 .../api/operators/StreamSourceContexts.java     |   6 +-
 .../runtime/io/StreamInputProcessor.java        |   4 +-
 .../runtime/tasks/AsyncExceptionHandler.java    |   1 +
 .../runtime/tasks/AsynchronousException.java    |  11 +-
 .../tasks/DefaultTimeServiceProvider.java       |  57 +++++----
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  18 +--
 .../runtime/tasks/TestTimeServiceProvider.java  |   2 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../runtime/operators/TimeProviderTest.java     |  45 +++++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  84 ++++++++-----
 ...AlignedProcessingTimeWindowOperatorTest.java | 101 +++++++++------
 .../operators/windowing/NoOpTimerService.java   |  49 ++++++++
 .../util/OneInputStreamOperatorTestHarness.java |   6 +-
 16 files changed, 347 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 8c68fbe..c3ba7b7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,7 +25,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
@@ -34,6 +37,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -110,6 +114,7 @@ public class AbstractFetcherTimestampsTest {
 	
 	@Test
 	public void testPeriodicWatermarks() throws Exception {
+
 		ExecutionConfig config = new ExecutionConfig();
 		config.setAutoWatermarkInterval(10);
 		
@@ -120,61 +125,70 @@ public class AbstractFetcherTimestampsTest {
 
 		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
 
-		TestFetcher<Long> fetcher = new TestFetcher<>(
-				sourceContext, originalPartitions,
-				new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
-				null, new MockRuntimeContext(17, 3, config, sourceContext.getCheckpointLock()));
-
-		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
-		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
-		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
-
-		// elements generate a watermark if the timestamp is a multiple of three
-
-		// elements for partition 1
-		fetcher.emitRecord(1L, part1, 1L);
-		fetcher.emitRecord(2L, part1, 2L);
-		fetcher.emitRecord(3L, part1, 3L);
-		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
-
-		// elements for partition 2
-		fetcher.emitRecord(12L, part2, 1L);
-		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock());
 
-		// elements for partition 3
-		fetcher.emitRecord(101L, part3, 1L);
-		fetcher.emitRecord(102L, part3, 2L);
-		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
-		// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
-		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
-		// advance partition 3
-		fetcher.emitRecord(1003L, part3, 3L);
-		fetcher.emitRecord(1004L, part3, 4L);
-		fetcher.emitRecord(1005L, part3, 5L);
-		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
-		// advance partition 1 beyond partition 2 - this bumps the watermark
-		fetcher.emitRecord(30L, part1, 4L);
-		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
-		
-		// this blocks until the periodic thread emitted the watermark
-		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
-		// advance partition 2 again - this bumps the watermark
-		fetcher.emitRecord(13L, part2, 2L);
-		fetcher.emitRecord(14L, part2, 3L);
-		fetcher.emitRecord(15L, part2, 3L);
-
-		// this blocks until the periodic thread emitted the watermark
-		long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
-		assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+		try {
+			TestFetcher<Long> fetcher = new TestFetcher<>(
+					sourceContext, originalPartitions,
+					new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+					null, new MockRuntimeContext(17, 3, config, timerService));
+	
+			final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+			final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+			final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+	
+			// elements generate a watermark if the timestamp is a multiple of three
+	
+			// elements for partition 1
+			fetcher.emitRecord(1L, part1, 1L);
+			fetcher.emitRecord(2L, part1, 2L);
+			fetcher.emitRecord(3L, part1, 3L);
+			assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+			assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+	
+			// elements for partition 2
+			fetcher.emitRecord(12L, part2, 1L);
+			assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+			assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+	
+			// elements for partition 3
+			fetcher.emitRecord(101L, part3, 1L);
+			fetcher.emitRecord(102L, part3, 2L);
+			assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+			assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+	
+			// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+			assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+	
+			// advance partition 3
+			fetcher.emitRecord(1003L, part3, 3L);
+			fetcher.emitRecord(1004L, part3, 4L);
+			fetcher.emitRecord(1005L, part3, 5L);
+			assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+			assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+	
+			// advance partition 1 beyond partition 2 - this bumps the watermark
+			fetcher.emitRecord(30L, part1, 4L);
+			assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+			assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+			
+			// this blocks until the periodic thread emitted the watermark
+			assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+	
+			// advance partition 2 again - this bumps the watermark
+			fetcher.emitRecord(13L, part2, 2L);
+			fetcher.emitRecord(14L, part2, 3L);
+			fetcher.emitRecord(15L, part2, 3L);
+	
+			// this blocks until the periodic thread emitted the watermark
+			long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+			assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+		}
+		finally {
+			timerService.shutdownService();
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 5be4195..e1ec4cb 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -32,45 +32,46 @@ import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
 
 @SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	private final int numberOfParallelSubtasks;
 	private final int indexOfThisSubtask;
-	
-	private final ExecutionConfig execConfig;
 
-	private final TimeServiceProvider timerService;
+	private final ExecutionConfig execConfig;
 
+	private final TimeServiceProvider timeServiceProvider;
+	
 	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
-		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), new Object());
+		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig());
 	}
 
 	public MockRuntimeContext(
-		int numberOfParallelSubtasks,
-		int indexOfThisSubtask,
-		ExecutionConfig execConfig,
-		Object checkpointLock) {
-
+			int numberOfParallelSubtasks,
+			int indexOfThisSubtask,
+			ExecutionConfig execConfig) {
+		this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, null);
+	}
+	
+	public MockRuntimeContext(
+			int numberOfParallelSubtasks,
+			int indexOfThisSubtask,
+			ExecutionConfig execConfig,
+			TimeServiceProvider timeServiceProvider) {
+		
 		super(new MockStreamOperator(),
 			new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
 			Collections.<String, Accumulator<?, ?>>emptyMap());
@@ -78,8 +79,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.indexOfThisSubtask = indexOfThisSubtask;
 		this.execConfig = execConfig;
-		this.timerService = DefaultTimeServiceProvider.
-			createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock);
+		this.timeServiceProvider = timeServiceProvider;
 	}
 
 	@Override
@@ -189,7 +189,11 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	@Override
 	public TimeServiceProvider getTimeServiceProvider() {
-		return timerService;
+		if (timeServiceProvider == null) {
+			throw new UnsupportedOperationException();
+		} else {
+			return timeServiceProvider;
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index abaf4e7..a290deb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -35,9 +35,9 @@ public class StreamSourceContexts {
 	 * Depending on the {@link TimeCharacteristic}, this method will return the adequate
 	 * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is:
 	 * <ul>
-	 * <li> {@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}
-	 * <li> {@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}
-	 * <li> {@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}
+	 *     <li>{@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}</li>
+	 *     <li>{@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}</li>
+	 *     <li>{@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}</li>
 	 * </ul>
 	 * */
 	public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 85e9297..2dbc6d4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -83,7 +83,9 @@ public class StreamInputProcessor<IN> {
 	private Counter numRecordsIn;
 
 	@SuppressWarnings("unchecked")
-	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
+	public StreamInputProcessor(
+			InputGate[] inputGates,
+			TypeSerializer<IN> inputSerializer,
 			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index 4c55055..a8125c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
index 311e0cd..cda0511 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
@@ -15,22 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 
 /**
- * {@code RuntimeException} for wrapping exceptions that are thrown in Threads that are not the
- * main compute Thread.
+ * An exception for wrapping exceptions that are thrown by an operator in threads other than the
+ * main compute thread of that operator. 
  */
 @Internal
-public class AsynchronousException extends RuntimeException {
+public class AsynchronousException extends Exception {
 	private static final long serialVersionUID = 1L;
 
 	public AsynchronousException(Throwable cause) {
 		super(cause);
 	}
 
+	public AsynchronousException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
 	@Override
 	public String toString() {
 		return "AsynchronousException{" + getCause() + "}";

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index 9534b3c..5664eac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,15 +17,15 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.util.Preconditions;
 
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TimeServiceProvider} which assigns as current processing time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
@@ -35,24 +35,34 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 	/** The containing task that owns this time service provider. */
 	private final AsyncExceptionHandler task;
 
+	/** The lock that timers acquire upon triggering */
 	private final Object checkpointLock;
 
 	/** The executor service that schedules and calls the triggers of this task*/
-	private final ScheduledExecutorService timerService;
+	private final ScheduledThreadPoolExecutor timerService;
+
 
-	public static DefaultTimeServiceProvider create(
-			AsyncExceptionHandler exceptionHandler,
-			ScheduledExecutorService executor,
-			Object checkpointLock) {
-		return new DefaultTimeServiceProvider(exceptionHandler, executor, checkpointLock);
+	public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) {
+		this(failureHandler, checkpointLock, null);
 	}
 
-	private DefaultTimeServiceProvider(AsyncExceptionHandler task,
-									ScheduledExecutorService threadPoolExecutor,
-									Object checkpointLock) {
-		this.task = Preconditions.checkNotNull(task);
-		this.timerService = Preconditions.checkNotNull(threadPoolExecutor);
-		this.checkpointLock = Preconditions.checkNotNull(checkpointLock);
+	public DefaultTimeServiceProvider(
+			AsyncExceptionHandler task,
+			Object checkpointLock,
+			ThreadFactory threadFactory) {
+		
+		this.task = checkNotNull(task);
+		this.checkpointLock = checkNotNull(checkpointLock);
+
+		if (threadFactory == null) {
+			this.timerService = new ScheduledThreadPoolExecutor(1);
+		} else {
+			this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
+		}
+
+		// allow trigger tasks to be removed if all timers for
+		// that timestamp are removed by user
+		this.timerService.setRemoveOnCancelPolicy(true);
 	}
 
 	@Override
@@ -76,6 +86,13 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 		timerService.shutdownNow();
 	}
 
+	// safety net to destroy the thread pool
+	@Override
+	protected void finalize() throws Throwable {
+		super.finalize();
+		timerService.shutdownNow();
+	}
+
 	/**
 	 * Internal task that is invoked by the timer service and triggers the target.
 	 */
@@ -105,14 +122,4 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 			}
 		}
 	}
-
-	@VisibleForTesting
-	public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
-		return new DefaultTimeServiceProvider(new AsyncExceptionHandler() {
-			@Override
-			public void handleAsyncException(String message, Throwable exception) {
-				exception.printStackTrace();
-			}
-		}, executor, checkpointLock);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index cf8853e..0a6534b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		final Object lock = getCheckpointLock();
 		
 		while (running && inputProcessor.processInput(operator, lock)) {
-
+			// all the work happens in the "processInput" method
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 33317fa..040ec66 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -67,7 +67,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * Base class for all streaming tasks. A task is the unit of local processing that is deployed
@@ -223,15 +223,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 			// if the clock is not already set, then assign a default TimeServiceProvider
 			if (timerService == null) {
+				ThreadFactory timerThreadFactory =
+					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
 
-				ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
-					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
-
-				// allow trigger tasks to be removed if all timers for
-				// that timestamp are removed by user
-				executor.setRemoveOnCancelPolicy(true);
-
-				timerService = DefaultTimeServiceProvider.create(this, executor, getCheckpointLock());
+				timerService = new DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory);
 			}
 
 			operatorChain = new OperatorChain<>(this, getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -305,10 +300,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// stop all timers and threads
 			if (timerService != null) {
 				try {
-					if (!timerService.isTerminated()) {
-						LOG.info("Timer service is shutting down.");
-						timerService.shutdownService();
-					}
+					timerService.shutdownService();
 				}
 				catch (Throwable t) {
 					// catch and log the exception to not replace the original exception

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index a21a2e1..81faec9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -86,7 +86,7 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 		return isTerminated;
 	}
 
-	public int getNoOfRegisteredTimers() {
+	public int getNumRegisteredTimers() {
 		int count = 0;
 		for (List<Triggerable> tasks: registeredTasks.values()) {
 			count += tasks.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 0197c53..fb08959 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 		final Object lock = getCheckpointLock();
 		
 		while (running && inputProcessor.processInput(operator, lock)) {
-
+			// all the work happens in the "processInput" method
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 0351978..8d3e621 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -28,6 +29,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,13 +39,14 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
+@PrepareForTest({ResultPartitionWriter.class})
 @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class TimeProviderTest {
 
@@ -52,8 +55,10 @@ public class TimeProviderTest {
 		final OneShotLatch latch = new OneShotLatch();
 
 		final Object lock = new Object();
-		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
-			.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		
+		TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		final List<Long> timestamps = new ArrayList<>();
 
@@ -114,6 +119,8 @@ public class TimeProviderTest {
 			lastTs = timestamp;
 			counter++;
 		}
+
+		assertNull(error.get());
 	}
 
 	@Test
@@ -124,14 +131,14 @@ public class TimeProviderTest {
 
 		final Object lock = new Object();
 
-		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
-			.create(new AsyncExceptionHandler() {
+		TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
+			new AsyncExceptionHandler() {
 				@Override
 				public void handleAsyncException(String message, Throwable exception) {
 					exceptionWasThrown.compareAndSet(false, true);
 					latch.trigger();
 				}
-			}, Executors.newSingleThreadScheduledExecutor(), lock);
+			}, lock);
 
 		long now = System.currentTimeMillis();
 		timeServiceProvider.registerTimer(now, new Triggerable() {
@@ -182,7 +189,7 @@ public class TimeProviderTest {
 			}
 		});
 
-		Assert.assertEquals(provider.getNoOfRegisteredTimers(), 4);
+		Assert.assertEquals(provider.getNumRegisteredTimers(), 4);
 
 		provider.setCurrentTime(100);
 		long seen = 0;
@@ -233,14 +240,30 @@ public class TimeProviderTest {
 			}
 		});
 
-		assertEquals(2, tp.getNoOfRegisteredTimers());
+		assertEquals(2, tp.getNumRegisteredTimers());
 
 		tp.setCurrentTime(35);
-		assertEquals(1, tp.getNoOfRegisteredTimers());
+		assertEquals(1, tp.getNumRegisteredTimers());
 
 		tp.setCurrentTime(40);
-		assertEquals(0, tp.getNoOfRegisteredTimers());
+		assertEquals(0, tp.getNumRegisteredTimers());
 
 		tp.shutdownService();
 	}
+
+	// ------------------------------------------------------------------------
+
+	public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
+
+		private final AtomicReference<Throwable> errorReference;
+
+		public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
+			this.errorReference = errorReference;
+		}
+
+		@Override
+		public void handleAsyncException(String message, Throwable exception) {
+			errorReference.compareAndSet(null, exception);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 30f38e3..4c6d391 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -58,7 +59,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -182,14 +183,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testWindowTriggerTimeAlignment() throws Exception {
-		final Object lock = new Object();
-		TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
-
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final TimeServiceProvider timerService = new NoOpTimerService();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
 
 			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
 
@@ -201,11 +199,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
-			timerService.shutdownService();
-			timerService = DefaultTimeServiceProvider.createForTesting(
-				Executors.newSingleThreadScheduledExecutor(), lock);
-			mockTask = createMockTaskWithTimer(timerService, lock);
-
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -214,11 +207,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
-			timerService.shutdownService();
-			timerService = DefaultTimeServiceProvider.createForTesting(
-				Executors.newSingleThreadScheduledExecutor(), lock);
-			mockTask = createMockTaskWithTimer(timerService, lock);
-
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -227,11 +215,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
-			timerService.shutdownService();
-			timerService = DefaultTimeServiceProvider.createForTesting(
-				Executors.newSingleThreadScheduledExecutor(), lock);
-			mockTask = createMockTaskWithTimer(timerService, lock);
-
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -244,16 +227,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			timerService.shutdownService();
-		}
 	}
 
 	@Test
 	public void testTumblingWindow() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final int windowSize = 50;
@@ -285,6 +267,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
 
 			List<Integer> result = out.getElements();
@@ -294,6 +278,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			for (int i = 0; i < numElements; i++) {
 				assertEquals(i, result.get(i).intValue());
 			}
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -307,8 +295,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testSlidingWindow() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
@@ -335,6 +325,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
 
 			// get and verify the result
@@ -361,6 +353,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 					lastCount = 1;
 				}
 			}
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		} finally {
 			timerService.shutdownService();
 		}
@@ -369,8 +365,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testTumblingWindowSingleElements() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
@@ -412,7 +410,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -426,8 +430,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testSlidingWindowSingleElements() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+			new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
@@ -460,7 +466,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -798,4 +810,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 		return result;
 	}
+
+	private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+		timers.shutdownService();
+
+		while (!timers.isTerminated()) {
+			Thread.sleep(2);
+		}
+	} 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7539c2d..88e28bc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,14 +40,15 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -60,7 +61,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -190,15 +191,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testWindowTriggerTimeAlignment() throws Exception {
-		final Object lock = new Object();
-		TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
-
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
+			final TimeServiceProvider timerService = new NoOpTimerService();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
+
 			AggregatingProcessingTimeWindowOperator<String, String> op;
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
@@ -209,11 +207,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
-			timerService.shutdownService();
-			timerService = DefaultTimeServiceProvider.createForTesting(
-				Executors.newSingleThreadScheduledExecutor(), lock);
-			mockTask = createMockTaskWithTimer(timerService, lock);
-
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -222,11 +215,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
-			timerService.shutdownService();
-			timerService = DefaultTimeServiceProvider.createForTesting(
-				Executors.newSingleThreadScheduledExecutor(), lock);
-			mockTask = createMockTaskWithTimer(timerService, lock);
-
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -235,11 +223,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
-			timerService.shutdownService();
-			timerService = DefaultTimeServiceProvider.createForTesting(
-				Executors.newSingleThreadScheduledExecutor(), lock);
-			mockTask = createMockTaskWithTimer(timerService, lock);
-
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -251,16 +234,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		} finally {
-			timerService.shutdownService();
 		}
 	}
 
 	@Test
 	public void testTumblingWindowUniqueElements() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final int windowSize = 50;
@@ -297,6 +280,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
 
 
@@ -305,6 +290,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				assertEquals(i, result.get(i).f0.intValue());
 				assertEquals(i, result.get(i).f1.intValue());
 			}
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -318,8 +307,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testTumblingWindowDuplicateElements() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
@@ -364,6 +355,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
 
 			// we have ideally one element per window. we may have more, when we emitted a value into the
@@ -373,6 +366,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			// deduplicate for more accurate checks
 			HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result);
 			assertTrue(set.size() == 10);
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -386,8 +383,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testSlidingWindow() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
@@ -418,6 +417,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
 
 			// get and verify the result
@@ -445,6 +446,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					lastCount = 1;
 				}
 			}
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -458,8 +463,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testSlidingWindowSingleElements() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
@@ -504,7 +511,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			synchronized (lock) {
 				op.close();
 			}
+
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -518,8 +531,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testPropagateExceptionsFromProcessElement() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
-			Executors.newSingleThreadScheduledExecutor(), lock);
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
@@ -556,7 +571,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				assertTrue(e.getMessage().contains("Artificial Test Exception"));
 			}
 
+			shutdownTimerServiceAndWait(timerService);
 			op.dispose();
+
+			if (error.get() != null) {
+				throw new Exception(error.get());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -971,8 +991,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) {
-		StreamConfig cfg = new StreamConfig(new Configuration());
-		return cfg;
+		return new StreamConfig(new Configuration());
 	}
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
@@ -985,4 +1004,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		}
 		return result;
 	}
+
+	private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+		timers.shutdownService();
+
+		while (!timers.isTerminated()) {
+			Thread.sleep(2);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
new file mode 100644
index 0000000..16e658e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
+import java.util.concurrent.ScheduledFuture;
+
+class NoOpTimerService extends TimeServiceProvider {
+
+	private volatile boolean terminated;
+	
+	@Override
+	public long getCurrentProcessingTime() {
+		return System.currentTimeMillis();
+	}
+
+	@Override
+	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+		return null;
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return terminated;
+	}
+
+	@Override
+	public void shutdownService() throws Exception {
+		terminated = true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index d8a0ee2..9d8e6a5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -42,12 +42,12 @@ import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -145,7 +145,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 				@Override
 				public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
 
-					final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0];
+					final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
 					return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
 				}
 			}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
@@ -154,7 +154,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		}
 
 		timeServiceProvider = testTimeProvider != null ? testTimeProvider :
-			DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(), this.checkpointLock);
+			new DefaultTimeServiceProvider(mockTask, this.checkpointLock);
 
 		doAnswer(new Answer<TimeServiceProvider>() {
 			@Override