You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:20 UTC

[05/11] flink git commit: [FLINK-4877] Rename TimeServiceProvider to ProcessingTimeService

[FLINK-4877] Rename TimeServiceProvider to ProcessingTimeService

The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.

TimeServiceProvider is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e112a632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e112a632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e112a632

Branch: refs/heads/master
Commit: e112a63208006b4e348d75f3df84d2fd4b091797
Parents: 71d2e3e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Sep 25 20:58:16 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 21 19:03:04 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |   4 +-
 .../connectors/fs/bucketing/BucketingSink.java  |   6 +-
 .../kafka/internals/AbstractFetcher.java        |   8 +-
 .../AbstractFetcherTimestampsTest.java          |   8 +-
 .../kafka/testutils/MockRuntimeContext.java     |   8 +-
 .../source/ContinuousFileReaderOperator.java    |   2 +-
 .../api/operators/AbstractStreamOperator.java   |  17 +-
 .../streaming/api/operators/StreamSource.java   |   2 +-
 .../api/operators/StreamSourceContexts.java     |  14 +-
 .../api/operators/StreamingRuntimeContext.java  |   6 +-
 .../operators/ExtractTimestampsOperator.java    |   8 +-
 ...TimestampsAndPeriodicWatermarksOperator.java |   8 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   6 +-
 .../windowing/EvictingWindowOperator.java       |   2 +-
 .../operators/windowing/WindowOperator.java     |  10 +-
 .../tasks/DefaultTimeServiceProvider.java       | 262 ----------------
 .../runtime/tasks/ProcessingTimeService.java    |  83 +++++
 .../streaming/runtime/tasks/StreamTask.java     |  16 +-
 .../tasks/SystemProcessingTimeService.java      | 262 ++++++++++++++++
 .../tasks/TestProcessingTimeService.java        | 172 ++++++++++
 .../runtime/tasks/TestTimeServiceProvider.java  | 172 ----------
 .../runtime/tasks/TimeServiceProvider.java      |  83 -----
 .../operators/StreamSourceOperatorTest.java     |  47 +--
 .../runtime/operators/StreamTaskTimerTest.java  |   6 +-
 .../TestProcessingTimeServiceTest.java          | 113 +++++++
 .../runtime/operators/TestTimeProviderTest.java | 113 -------
 ...stampsAndPeriodicWatermarksOperatorTest.java |  13 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  49 ++-
 ...AlignedProcessingTimeWindowOperatorTest.java |  56 ++--
 .../operators/windowing/NoOpTimerService.java   |   4 +-
 .../operators/windowing/WindowOperatorTest.java |  14 +-
 .../tasks/DefaultTimeServiceProviderTest.java   | 313 -------------------
 .../runtime/tasks/StreamTaskTestHarness.java    |   6 +-
 .../tasks/SystemProcessingTimeServiceTest.java  | 313 +++++++++++++++++++
 .../KeyedOneInputStreamOperatorTestHarness.java |   8 +-
 .../flink/streaming/util/MockContext.java       |  23 --
 .../util/OneInputStreamOperatorTestHarness.java |  43 ++-
 .../streaming/util/WindowingTestHarness.java    |   6 +-
 .../runtime/StreamTaskTimerITCase.java          |  10 +-
 39 files changed, 1141 insertions(+), 1155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 36b5c5e..971d5f8 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -127,7 +127,7 @@ public class ContinuousFileMonitoringTest {
 		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
 		reader.setOutputType(typeInfo, executionConfig);
 
-		final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+		final TestProcessingTimeService timeServiceProvider = new TestProcessingTimeService();
 		final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
 			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
 		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 5a5cade..6f8a739 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -285,7 +285,7 @@ public class BucketingSink<T>
 
 	private transient Clock clock;
 
-	private transient TimeServiceProvider processingTimeService;
+	private transient ProcessingTimeService processingTimeService;
 
 	/**
 	 * Creates a new {@code BucketingSink} that writes files to the given base directory.
@@ -324,7 +324,7 @@ public class BucketingSink<T>
 		refTruncate = reflectTruncate(fs);
 
 		processingTimeService =
-				((StreamingRuntimeContext) getRuntimeContext()).getTimeServiceProvider();
+				((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
 
 		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index eb01b78..065b54f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -118,7 +118,7 @@ public abstract class AbstractFetcher<T, KPH> {
 					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
 			
 			PeriodicWatermarkEmitter periodicEmitter = 
-					new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getTimeServiceProvider(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
+					new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
 			periodicEmitter.start();
 		}
 	}
@@ -466,7 +466,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		
 		private final SourceContext<?> emitter;
 		
-		private final TimeServiceProvider timerService;
+		private final ProcessingTimeService timerService;
 
 		private final long interval;
 		
@@ -477,7 +477,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		PeriodicWatermarkEmitter(
 				KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
 				SourceContext<?> emitter,
-				TimeServiceProvider timerService,
+				ProcessingTimeService timerService,
 				long autoWatermarkInterval)
 		{
 			this.allPartitions = checkNotNull(allPartitions);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 7db6ba4..0782cb9 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,10 +25,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
@@ -128,7 +128,7 @@ public class AbstractFetcherTimestampsTest {
 		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
 
 		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock());
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index e1ec4cb..f16eacd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import java.io.Serializable;
 import java.util.Collections;
@@ -53,7 +53,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	private final ExecutionConfig execConfig;
 
-	private final TimeServiceProvider timeServiceProvider;
+	private final ProcessingTimeService timeServiceProvider;
 	
 	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
 		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig());
@@ -70,7 +70,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 			int numberOfParallelSubtasks,
 			int indexOfThisSubtask,
 			ExecutionConfig execConfig,
-			TimeServiceProvider timeServiceProvider) {
+			ProcessingTimeService timeServiceProvider) {
 		
 		super(new MockStreamOperator(),
 			new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
@@ -188,7 +188,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	}
 
 	@Override
-	public TimeServiceProvider getTimeServiceProvider() {
+	public ProcessingTimeService getProcessingTimeService() {
 		if (timeServiceProvider == null) {
 			throw new UnsupportedOperationException();
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 769cb6f..be22677 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -107,7 +107,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
 		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 		this.readerContext = StreamSourceContexts.getSourceContext(
-			timeCharacteristic, getTimerService(), checkpointLock, output, watermarkInterval);
+			timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval);
 
 		// and initialize the split reading thread
 		this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9184e93..b789c95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -51,11 +51,12 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,7 +119,7 @@ public abstract class AbstractStreamOperator<OUT>
 
 	/** Keyed state store view on the keyed backend */
 	private transient DefaultKeyedStateStore keyedStateStore;
-	
+
 	/** Operator state backend / store */
 	private transient OperatorStateBackend operatorStateBackend;
 
@@ -246,7 +247,7 @@ public abstract class AbstractStreamOperator<OUT>
 						keySerializer,
 						container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
 						subTaskKeyGroupRange);
-				
+
 				this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
 			}
 
@@ -396,11 +397,11 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	/**
-	 * Returns the {@link TimeServiceProvider} responsible for getting  the current
+	 * Returns the {@link ProcessingTimeService} responsible for getting  the current
 	 * processing time and registering timers.
 	 */
-	protected TimeServiceProvider getTimerService() {
-		return container.getTimerService();
+	protected ProcessingTimeService getProcessingTimeService() {
+		return container.getProcessingTimeService();
 	}
 
 	/**
@@ -421,9 +422,9 @@ public abstract class AbstractStreamOperator<OUT>
 	 */
 	@SuppressWarnings("unchecked")
 	protected <S extends State, N> S getPartitionedState(
-			N namespace, TypeSerializer<N> namespaceSerializer, 
+			N namespace, TypeSerializer<N> namespaceSerializer,
 			StateDescriptor<S, ?> stateDescriptor) throws Exception {
-		
+
 		if (keyedStateStore != null) {
 			return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index a07e6b7..5a16db0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -69,7 +69,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 
 		this.ctx = StreamSourceContexts.getSourceContext(
-			timeCharacteristic, getTimerService(), lockingObject, collector, watermarkInterval);
+			timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval);
 
 		try {
 			userFunction.run(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index d0c4e15..01ae55c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.ScheduledFuture;
@@ -42,7 +42,7 @@ public class StreamSourceContexts {
 	 * </ul>
 	 * */
 	public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
-			TimeCharacteristic timeCharacteristic, TimeServiceProvider timeService,
+			TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService,
 			Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) {
 
 		final SourceFunction.SourceContext<OUT> ctx;
@@ -51,7 +51,7 @@ public class StreamSourceContexts {
 				ctx = new ManualWatermarkContext<>(checkpointLock, output);
 				break;
 			case IngestionTime:
-				ctx = new AutomaticWatermarkContext<>(timeService, checkpointLock, output, watermarkInterval);
+				ctx = new AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, watermarkInterval);
 				break;
 			case ProcessingTime:
 				ctx = new NonTimestampContext<>(checkpointLock, output);
@@ -111,7 +111,7 @@ public class StreamSourceContexts {
 	 */
 	private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
 
-		private final TimeServiceProvider timeService;
+		private final ProcessingTimeService timeService;
 		private final Object lock;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
@@ -122,7 +122,7 @@ public class StreamSourceContexts {
 		private volatile long nextWatermarkTime;
 
 		private AutomaticWatermarkContext(
-			final TimeServiceProvider timeService,
+			final ProcessingTimeService timeService,
 			final Object checkpointLock,
 			final Output<StreamRecord<T>> output,
 			final long watermarkInterval) {
@@ -201,12 +201,12 @@ public class StreamSourceContexts {
 
 		private class WatermarkEmittingTask implements Triggerable {
 
-			private final TimeServiceProvider timeService;
+			private final ProcessingTimeService timeService;
 			private final Object lock;
 			private final Output<StreamRecord<T>> output;
 
 			private WatermarkEmittingTask(
-					TimeServiceProvider timeService,
+					ProcessingTimeService timeService,
 					Object checkpointLock,
 					Output<StreamRecord<T>> output) {
 				this.timeService = timeService;

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index cd0489f..fc9e39e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import java.util.List;
 import java.util.Map;
@@ -77,8 +77,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		return taskEnvironment.getInputSplitProvider();
 	}
 
-	public TimeServiceProvider getTimeServiceProvider() {
-		return operator.getTimerService();
+	public ProcessingTimeService getProcessingTimeService() {
+		return operator.getProcessingTimeService();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index c92ff34..0798ed4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -54,8 +54,8 @@ public class ExtractTimestampsOperator<T>
 		super.open();
 		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
 		if (watermarkInterval > 0) {
-			long now = getTimerService().getCurrentProcessingTime();
-			getTimerService().registerTimer(now + watermarkInterval, this);
+			long now = getProcessingTimeService().getCurrentProcessingTime();
+			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
 		}
 		currentWatermark = Long.MIN_VALUE;
 	}
@@ -81,8 +81,8 @@ public class ExtractTimestampsOperator<T>
 			output.emitWatermark(new Watermark(currentWatermark));
 		}
 
-		long now = getTimerService().getCurrentProcessingTime();
-		getTimerService().registerTimer(now + watermarkInterval, this);
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index f791723..b1402ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -54,8 +54,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
 		
 		if (watermarkInterval > 0) {
-			long now = getTimerService().getCurrentProcessingTime();
-			getTimerService().registerTimer(now + watermarkInterval, this);
+			long now = getProcessingTimeService().getCurrentProcessingTime();
+			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
 		}
 	}
 
@@ -77,8 +77,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 			output.emitWatermark(newWatermark);
 		}
 
-		long now = getTimerService().getCurrentProcessingTime();
-		getTimerService().registerTimer(now + watermarkInterval, this);
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index b39b760..d331d4d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -125,7 +125,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		
 		// decide when to first compute the window and when to slide it
 		// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
-		final long now = getTimerService().getCurrentProcessingTime();
+		final long now = getProcessingTimeService().getCurrentProcessingTime();
 		nextEvaluationTime = now + windowSlide - (now % windowSlide);
 		nextSlideTime = now + paneSize - (now % paneSize);
 
@@ -166,7 +166,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		}
 
 		// make sure the first window happens
-		getTimerService().registerTimer(firstTriggerTime, this);
+		getProcessingTimeService().registerTimer(firstTriggerTime, this);
 	}
 
 	@Override
@@ -230,7 +230,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		}
 
 		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-		getTimerService().registerTimer(nextTriggerTime, this);
+		getProcessingTimeService().registerTimer(nextTriggerTime, this);
 	}
 	
 	private void computeWindow(long timestamp) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 6609e4d..141b5b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -307,7 +307,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		}
 
 		if (timer != null) {
-			nextTimer = getTimerService().registerTimer(timer.timestamp, this);
+			nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 4d8f655..459c679 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -223,7 +223,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
 			@Override
 			public long getCurrentProcessingTime() {
-				return WindowOperator.this.getTimerService().getCurrentProcessingTime();
+				return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
 			}
 		};
 
@@ -233,7 +233,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		// re-register the restored timers (if any)
 		if (processingTimeTimersQueue.size() > 0) {
-			nextTimer = getTimerService().registerTimer(processingTimeTimersQueue.peek().timestamp, this);
+			nextTimer = getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp, this);
 		}
 	}
 
@@ -495,7 +495,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (timer != null) {
-			nextTimer = getTimerService().registerTimer(timer.timestamp, this);
+			nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
 		}
 	}
 
@@ -697,7 +697,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		@Override
 		public long getCurrentProcessingTime() {
-			return WindowOperator.this.getTimerService().getCurrentProcessingTime();
+			return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
 		}
 
 		@Override
@@ -717,7 +717,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					if (nextTimer != null) {
 						nextTimer.cancel(false);
 					}
-					nextTimer = getTimerService().registerTimer(time, WindowOperator.this);
+					nextTimer = getProcessingTimeService().registerTimer(time, WindowOperator.this);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
deleted file mode 100644
index d2c743f..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import javax.annotation.Nonnull;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TimeServiceProvider} which assigns as current processing time the result of calling
- * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
- */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
-
-	private static final int STATUS_ALIVE = 0;
-	private static final int STATUS_QUIESCED = 1;
-	private static final int STATUS_SHUTDOWN = 2;
-
-	// ------------------------------------------------------------------------
-
-	/** The containing task that owns this time service provider. */
-	private final AsyncExceptionHandler task;
-
-	/** The lock that timers acquire upon triggering */
-	private final Object checkpointLock;
-
-	/** The executor service that schedules and calls the triggers of this task*/
-	private final ScheduledThreadPoolExecutor timerService;
-
-	private final AtomicInteger status;
-
-
-	public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) {
-		this(failureHandler, checkpointLock, null);
-	}
-
-	public DefaultTimeServiceProvider(
-			AsyncExceptionHandler task,
-			Object checkpointLock,
-			ThreadFactory threadFactory) {
-
-		this.task = checkNotNull(task);
-		this.checkpointLock = checkNotNull(checkpointLock);
-
-		this.status = new AtomicInteger(STATUS_ALIVE);
-
-		if (threadFactory == null) {
-			this.timerService = new ScheduledThreadPoolExecutor(1);
-		} else {
-			this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
-		}
-
-		// tasks should be removed if the future is canceled
-		this.timerService.setRemoveOnCancelPolicy(true);
-
-		// make sure shutdown removes all pending tasks
-		this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
-		this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-	}
-
-	@Override
-	public long getCurrentProcessingTime() {
-		return System.currentTimeMillis();
-	}
-
-	@Override
-	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
-		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
-
-		// we directly try to register the timer and only react to the status on exception
-		// that way we save unnecessary volatile accesses for each timer
-		try {
-			return timerService.schedule(
-					new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
-		}
-		catch (RejectedExecutionException e) {
-			final int status = this.status.get();
-			if (status == STATUS_QUIESCED) {
-				return new NeverCompleteFuture(delay);
-			}
-			else if (status == STATUS_SHUTDOWN) {
-				throw new IllegalStateException("Timer service is shut down");
-			}
-			else {
-				// something else happened, so propagate the exception
-				throw e;
-			}
-		}
-	}
-
-	@Override
-	public boolean isTerminated() {
-		return status.get() == STATUS_SHUTDOWN;
-	}
-
-	@Override
-	public void quiesceAndAwaitPending() throws InterruptedException {
-		if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
-			timerService.shutdown();
-
-			// await forever (almost)
-			timerService.awaitTermination(365, TimeUnit.DAYS);
-		}
-	}
-
-	@Override
-	public void shutdownService() {
-		if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || 
-				status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
-		{
-			timerService.shutdownNow();
-		}
-	}
-
-	// safety net to destroy the thread pool
-	@Override
-	protected void finalize() throws Throwable {
-		super.finalize();
-		timerService.shutdownNow();
-	}
-
-	@VisibleForTesting
-	int getNumTasksScheduled() {
-		BlockingQueue<?> queue = timerService.getQueue();
-		if (queue == null) {
-			return 0;
-		} else {
-			return queue.size();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Internal task that is invoked by the timer service and triggers the target.
-	 */
-	private static final class TriggerTask implements Runnable {
-
-		private final Object lock;
-		private final Triggerable target;
-		private final long timestamp;
-		private final AsyncExceptionHandler exceptionHandler;
-
-		TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) {
-			this.exceptionHandler = exceptionHandler;
-			this.lock = lock;
-			this.target = target;
-			this.timestamp = timestamp;
-		}
-
-		@Override
-		public void run() {
-			synchronized (lock) {
-				try {
-					target.trigger(timestamp);
-				} catch (Throwable t) {
-					TimerException asyncException = new TimerException(t);
-					exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
-
-		private final Object lock = new Object();
-
-		private final long delayMillis;
-
-		private volatile boolean canceled;
-
-
-		private NeverCompleteFuture(long delayMillis) {
-			this.delayMillis = delayMillis;
-		}
-
-		@Override
-		public long getDelay(@Nonnull TimeUnit unit) {
-			return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
-		}
-
-		@Override
-		public int compareTo(@Nonnull Delayed o) {
-			long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
-			return Long.compare(this.delayMillis, otherMillis);
-		}
-
-		@Override
-		public boolean cancel(boolean mayInterruptIfRunning) {
-			synchronized (lock) {
-				canceled = true;
-				lock.notifyAll();
-			}
-			return true;
-		}
-
-		@Override
-		public boolean isCancelled() {
-			return canceled;
-		}
-
-		@Override
-		public boolean isDone() {
-			return false;
-		}
-
-		@Override
-		public Object get() throws InterruptedException {
-			synchronized (lock) {
-				while (!canceled) {
-					lock.wait();
-				}
-			}
-			throw new CancellationException();
-		}
-
-		@Override
-		public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
-			synchronized (lock) {
-				while (!canceled) {
-					unit.timedWait(lock, timeout);
-				}
-
-				if (canceled) {
-					throw new CancellationException();
-				} else {
-					throw new TimeoutException();
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
new file mode 100644
index 0000000..15c3ebb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Defines the current processing time and handles all related actions,
+ * such as register timers for tasks to be executed in the future.
+ * 
+ * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of
+ * whether the timer service has been shut down.
+ * 
+ * <p>The registration of timers follows a life cycle of three phases:
+ * <ol>
+ *     <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li>
+ *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
+ *         {@link #registerTimer(long, Triggerable)} will not register any further timers, and will
+ *         return a "dummy" future as a result. This is used for clean shutdown, where currently firing
+ *         timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li>
+ *     <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)}
+ *         will result in a hard exception.</li>
+ * </ol>
+ */
+public abstract class ProcessingTimeService {
+
+	/**
+	 * Returns the current processing time.
+	 */
+	public abstract long getCurrentProcessingTime();
+
+	/**
+	 * Registers a task to be executed when (processing) time is {@code timestamp}.
+	 * 
+	 * @param timestamp   Time when the task is to be executed (in processing time)
+	 * @param target      The task to be executed
+	 * 
+	 * @return The future that represents the scheduled task. This always returns some future,
+	 *         even if the timer was shut down
+	 */
+	public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target);
+
+	/**
+	 * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
+	 */
+	public abstract boolean isTerminated();
+
+	/**
+	 * This method puts the service into a state where it does not register new timers, but
+	 * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future.
+	 * Furthermore, the method clears all not yet started timers, and awaits the completion
+	 * of currently executing timers.
+	 * 
+	 * <p>This method can be used to cleanly shut down the timer service. The using components
+	 * will not notice that the service is shut down (as for example via exceptions when registering
+	 * a new timer), but the service will simply not fire any timer any more.
+	 */
+	public abstract void quiesceAndAwaitPending() throws InterruptedException;
+
+	/**
+	 * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+	 * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)}
+	 * will result in a hard exception.
+	 */
+	public abstract void shutdownService();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 77efc7b..905782b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -146,11 +146,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	private AbstractKeyedStateBackend<?> keyedStateBackend;
 
 	/**
-	 * The internal {@link TimeServiceProvider} used to define the current
+	 * The internal {@link ProcessingTimeService} used to define the current
 	 * processing time (default = {@code System.currentTimeMillis()}) and
 	 * register timers for tasks to be executed in the future.
 	 */
-	private TimeServiceProvider timerService;
+	private ProcessingTimeService timerService;
 
 	/** The map of user-defined accumulators of this task */
 	private Map<String, Accumulator<?, ?>> accumulatorMap;
@@ -190,13 +190,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Allows the user to specify his own {@link TimeServiceProvider TimerServiceProvider}.
-	 * By default a {@link DefaultTimeServiceProvider DefaultTimerService} is going to be provided.
+	 * Allows the user to specify his own {@link ProcessingTimeService TimerServiceProvider}.
+	 * By default a {@link SystemProcessingTimeService DefaultTimerService} is going to be provided.
 	 * Changing it can be useful for testing processing time functionality, such as
 	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}
 	 * and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger Triggers}.
 	 * */
-	public void setTimeService(TimeServiceProvider timeProvider) {
+	public void setProcessingTimeService(ProcessingTimeService timeProvider) {
 		if (timeProvider == null) {
 			throw new RuntimeException("The timeProvider cannot be set to null.");
 		}
@@ -224,7 +224,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				ThreadFactory timerThreadFactory =
 					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
 
-				timerService = new DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory);
+				timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
 			}
 
 			operatorChain = new OperatorChain<>(this, getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -765,10 +765,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	/**
-	 * Returns the {@link TimeServiceProvider} responsible for telling the current
+	 * Returns the {@link ProcessingTimeService} responsible for telling the current
 	 * processing time and registering timers.
 	 */
-	public TimeServiceProvider getTimerService() {
+	public ProcessingTimeService getProcessingTimeService() {
 		if (timerService == null) {
 			throw new IllegalStateException("The timer service has not been initialized.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
new file mode 100644
index 0000000..3fd4202
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ProcessingTimeService} which assigns as current processing time the result of calling
+ * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
+ */
+public class SystemProcessingTimeService extends ProcessingTimeService {
+
+	private static final int STATUS_ALIVE = 0;
+	private static final int STATUS_QUIESCED = 1;
+	private static final int STATUS_SHUTDOWN = 2;
+
+	// ------------------------------------------------------------------------
+
+	/** The containing task that owns this time service provider. */
+	private final AsyncExceptionHandler task;
+
+	/** The lock that timers acquire upon triggering */
+	private final Object checkpointLock;
+
+	/** The executor service that schedules and calls the triggers of this task*/
+	private final ScheduledThreadPoolExecutor timerService;
+
+	private final AtomicInteger status;
+
+
+	public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
+		this(failureHandler, checkpointLock, null);
+	}
+
+	public SystemProcessingTimeService(
+			AsyncExceptionHandler task,
+			Object checkpointLock,
+			ThreadFactory threadFactory) {
+
+		this.task = checkNotNull(task);
+		this.checkpointLock = checkNotNull(checkpointLock);
+
+		this.status = new AtomicInteger(STATUS_ALIVE);
+
+		if (threadFactory == null) {
+			this.timerService = new ScheduledThreadPoolExecutor(1);
+		} else {
+			this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
+		}
+
+		// tasks should be removed if the future is canceled
+		this.timerService.setRemoveOnCancelPolicy(true);
+
+		// make sure shutdown removes all pending tasks
+		this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+		this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+	}
+
+	@Override
+	public long getCurrentProcessingTime() {
+		return System.currentTimeMillis();
+	}
+
+	@Override
+	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
+
+		// we directly try to register the timer and only react to the status on exception
+		// that way we save unnecessary volatile accesses for each timer
+		try {
+			return timerService.schedule(
+					new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+		}
+		catch (RejectedExecutionException e) {
+			final int status = this.status.get();
+			if (status == STATUS_QUIESCED) {
+				return new NeverCompleteFuture(delay);
+			}
+			else if (status == STATUS_SHUTDOWN) {
+				throw new IllegalStateException("Timer service is shut down");
+			}
+			else {
+				// something else happened, so propagate the exception
+				throw e;
+			}
+		}
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return status.get() == STATUS_SHUTDOWN;
+	}
+
+	@Override
+	public void quiesceAndAwaitPending() throws InterruptedException {
+		if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
+			timerService.shutdown();
+
+			// await forever (almost)
+			timerService.awaitTermination(365, TimeUnit.DAYS);
+		}
+	}
+
+	@Override
+	public void shutdownService() {
+		if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || 
+				status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
+		{
+			timerService.shutdownNow();
+		}
+	}
+
+	// safety net to destroy the thread pool
+	@Override
+	protected void finalize() throws Throwable {
+		super.finalize();
+		timerService.shutdownNow();
+	}
+
+	@VisibleForTesting
+	int getNumTasksScheduled() {
+		BlockingQueue<?> queue = timerService.getQueue();
+		if (queue == null) {
+			return 0;
+		} else {
+			return queue.size();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Internal task that is invoked by the timer service and triggers the target.
+	 */
+	private static final class TriggerTask implements Runnable {
+
+		private final Object lock;
+		private final Triggerable target;
+		private final long timestamp;
+		private final AsyncExceptionHandler exceptionHandler;
+
+		TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) {
+			this.exceptionHandler = exceptionHandler;
+			this.lock = lock;
+			this.target = target;
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public void run() {
+			synchronized (lock) {
+				try {
+					target.trigger(timestamp);
+				} catch (Throwable t) {
+					TimerException asyncException = new TimerException(t);
+					exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
+
+		private final Object lock = new Object();
+
+		private final long delayMillis;
+
+		private volatile boolean canceled;
+
+
+		private NeverCompleteFuture(long delayMillis) {
+			this.delayMillis = delayMillis;
+		}
+
+		@Override
+		public long getDelay(@Nonnull TimeUnit unit) {
+			return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
+		}
+
+		@Override
+		public int compareTo(@Nonnull Delayed o) {
+			long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
+			return Long.compare(this.delayMillis, otherMillis);
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			synchronized (lock) {
+				canceled = true;
+				lock.notifyAll();
+			}
+			return true;
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return canceled;
+		}
+
+		@Override
+		public boolean isDone() {
+			return false;
+		}
+
+		@Override
+		public Object get() throws InterruptedException {
+			synchronized (lock) {
+				while (!canceled) {
+					lock.wait();
+				}
+			}
+			throw new CancellationException();
+		}
+
+		@Override
+		public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
+			synchronized (lock) {
+				while (!canceled) {
+					unit.timedWait(lock, timeout);
+				}
+
+				if (canceled) {
+					throw new CancellationException();
+				} else {
+					throw new TimeoutException();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
new file mode 100644
index 0000000..d2bf133
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is a {@link ProcessingTimeService} used <b>strictly for testing</b> the
+ * processing time functionality.
+ * */
+public class TestProcessingTimeService extends ProcessingTimeService {
+
+	private volatile long currentTime = 0;
+
+	private volatile boolean isTerminated;
+	private volatile boolean isQuiesced;
+
+	// sorts the timers by timestamp so that they are processed in the correct order.
+	private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
+
+	
+	public void setCurrentTime(long timestamp) throws Exception {
+		this.currentTime = timestamp;
+
+		if (!isQuiesced) {
+			// decide which timers to fire and put them in a list
+			// we do not fire them here to be able to accommodate timers
+			// that register other timers.
+	
+			Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
+			List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
+			while (it.hasNext()) {
+				Map.Entry<Long, List<Triggerable>> t = it.next();
+				if (t.getKey() <= this.currentTime) {
+					toRun.add(t);
+					it.remove();
+				}
+			}
+	
+			// now do the actual firing.
+			for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+				long now = tasks.getKey();
+				for (Triggerable task: tasks.getValue()) {
+					task.trigger(now);
+				}
+			}
+		}
+	}
+
+	@Override
+	public long getCurrentProcessingTime() {
+		return currentTime;
+	}
+
+	@Override
+	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+		if (isTerminated) {
+			throw new IllegalStateException("terminated");
+		}
+		if (isQuiesced) {
+			return new DummyFuture();
+		}
+
+		if (timestamp <= currentTime) {
+			try {
+				target.trigger(timestamp);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+		List<Triggerable> tasks = registeredTasks.get(timestamp);
+		if (tasks == null) {
+			tasks = new ArrayList<>();
+			registeredTasks.put(timestamp, tasks);
+		}
+		tasks.add(target);
+
+		return new DummyFuture();
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return isTerminated;
+	}
+
+	@Override
+	public void quiesceAndAwaitPending() {
+		if (!isTerminated) {
+			isQuiesced = true;
+			registeredTasks.clear();
+		}
+	}
+
+	@Override
+	public void shutdownService() {
+		this.isTerminated = true;
+	}
+
+	public int getNumRegisteredTimers() {
+		int count = 0;
+		for (List<Triggerable> tasks: registeredTasks.values()) {
+			count += tasks.size();
+		}
+		return count;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class DummyFuture implements ScheduledFuture<Object> {
+
+		@Override
+		public long getDelay(TimeUnit unit) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public int compareTo(Delayed o) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			return true;
+		}
+
+		@Override
+		public boolean isCancelled() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public boolean isDone() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object get() throws InterruptedException, ExecutionException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
deleted file mode 100644
index 9eb6cd1..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the
- * processing time functionality.
- * */
-public class TestTimeServiceProvider extends TimeServiceProvider {
-
-	private volatile long currentTime = 0;
-
-	private volatile boolean isTerminated;
-	private volatile boolean isQuiesced;
-
-	// sorts the timers by timestamp so that they are processed in the correct order.
-	private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
-
-	
-	public void setCurrentTime(long timestamp) throws Exception {
-		this.currentTime = timestamp;
-
-		if (!isQuiesced) {
-			// decide which timers to fire and put them in a list
-			// we do not fire them here to be able to accommodate timers
-			// that register other timers.
-	
-			Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
-			List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
-			while (it.hasNext()) {
-				Map.Entry<Long, List<Triggerable>> t = it.next();
-				if (t.getKey() <= this.currentTime) {
-					toRun.add(t);
-					it.remove();
-				}
-			}
-	
-			// now do the actual firing.
-			for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
-				long now = tasks.getKey();
-				for (Triggerable task: tasks.getValue()) {
-					task.trigger(now);
-				}
-			}
-		}
-	}
-
-	@Override
-	public long getCurrentProcessingTime() {
-		return currentTime;
-	}
-
-	@Override
-	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
-		if (isTerminated) {
-			throw new IllegalStateException("terminated");
-		}
-		if (isQuiesced) {
-			return new DummyFuture();
-		}
-
-		if (timestamp <= currentTime) {
-			try {
-				target.trigger(timestamp);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		}
-		List<Triggerable> tasks = registeredTasks.get(timestamp);
-		if (tasks == null) {
-			tasks = new ArrayList<>();
-			registeredTasks.put(timestamp, tasks);
-		}
-		tasks.add(target);
-
-		return new DummyFuture();
-	}
-
-	@Override
-	public boolean isTerminated() {
-		return isTerminated;
-	}
-
-	@Override
-	public void quiesceAndAwaitPending() {
-		if (!isTerminated) {
-			isQuiesced = true;
-			registeredTasks.clear();
-		}
-	}
-
-	@Override
-	public void shutdownService() {
-		this.isTerminated = true;
-	}
-
-	public int getNumRegisteredTimers() {
-		int count = 0;
-		for (List<Triggerable> tasks: registeredTasks.values()) {
-			count += tasks.size();
-		}
-		return count;
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static class DummyFuture implements ScheduledFuture<Object> {
-
-		@Override
-		public long getDelay(TimeUnit unit) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int compareTo(Delayed o) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public boolean cancel(boolean mayInterruptIfRunning) {
-			return true;
-		}
-
-		@Override
-		public boolean isCancelled() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public boolean isDone() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object get() throws InterruptedException, ExecutionException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-			throw new UnsupportedOperationException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
deleted file mode 100644
index afa6f35..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.concurrent.ScheduledFuture;
-
-/**
- * Defines the current processing time and handles all related actions,
- * such as register timers for tasks to be executed in the future.
- * 
- * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of
- * whether the timer service has been shut down.
- * 
- * <p>The registration of timers follows a life cycle of three phases:
- * <ol>
- *     <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li>
- *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
- *         {@link #registerTimer(long, Triggerable)} will not register any further timers, and will
- *         return a "dummy" future as a result. This is used for clean shutdown, where currently firing
- *         timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li>
- *     <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)}
- *         will result in a hard exception.</li>
- * </ol>
- */
-public abstract class TimeServiceProvider {
-
-	/**
-	 * Returns the current processing time.
-	 */
-	public abstract long getCurrentProcessingTime();
-
-	/**
-	 * Registers a task to be executed when (processing) time is {@code timestamp}.
-	 * 
-	 * @param timestamp   Time when the task is to be executed (in processing time)
-	 * @param target      The task to be executed
-	 * 
-	 * @return The future that represents the scheduled task. This always returns some future,
-	 *         even if the timer was shut down
-	 */
-	public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target);
-
-	/**
-	 * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
-	 */
-	public abstract boolean isTerminated();
-
-	/**
-	 * This method puts the service into a state where it does not register new timers, but
-	 * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future.
-	 * Furthermore, the method clears all not yet started timers, and awaits the completion
-	 * of currently executing timers.
-	 * 
-	 * <p>This method can be used to cleanly shut down the timer service. The using components
-	 * will not notice that the service is shut down (as for example via exceptions when registering
-	 * a new timer), but the service will simply not fire any timer any more.
-	 */
-	public abstract void quiesceAndAwaitPending() throws InterruptedException;
-
-	/**
-	 * Shuts down and clean up the timer service provider hard and immediately. This does not wait
-	 * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)}
-	 * will result in a hard exception.
-	 */
-	public abstract void shutdownService();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 42087b4..f87b5ef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -38,8 +38,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -67,7 +67,7 @@ public class StreamSourceOperatorTest {
 		
 		final List<StreamElement> output = new ArrayList<>();
 		
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 		operator.run(new Object(), new CollectorOutput<String>(output));
 		
 		assertEquals(1, output.size());
@@ -84,7 +84,7 @@ public class StreamSourceOperatorTest {
 				new StreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 		operator.cancel();
 
 		// run and exit
@@ -104,7 +104,7 @@ public class StreamSourceOperatorTest {
 				new StreamSource<>(new InfiniteSource<String>());
 
 		
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 		
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -137,7 +137,7 @@ public class StreamSourceOperatorTest {
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 		operator.stop();
 
 		// run and stop
@@ -156,7 +156,7 @@ public class StreamSourceOperatorTest {
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -189,7 +189,7 @@ public class StreamSourceOperatorTest {
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
 		// emit latency marks every 10 milliseconds.
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -225,15 +225,15 @@ public class StreamSourceOperatorTest {
 			new StoppableStreamSource<>(new InfiniteSource<String>());
 
 		long watermarkInterval = 10;
-		TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
-		timeProvider.setCurrentTime(0);
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		processingTimeService.setCurrentTime(0);
 
-		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, timeProvider);
+		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, processingTimeService);
 
 		final List<StreamElement> output = new ArrayList<>();
 
 		StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
-			operator.getContainingTask().getTimerService(),
+			operator.getContainingTask().getProcessingTimeService(),
 			operator.getContainingTask().getCheckpointLock(),
 			new CollectorOutput<String>(output),
 			operator.getExecutionConfig().getAutoWatermarkInterval());
@@ -243,7 +243,7 @@ public class StreamSourceOperatorTest {
 		// going to be aligned with the watermark interval.
 
 		for (long i = 1; i < 100; i += watermarkInterval)  {
-			timeProvider.setCurrentTime(i);
+			processingTimeService.setCurrentTime(i);
 		}
 
 		assertTrue(output.size() == 9);
@@ -257,13 +257,21 @@ public class StreamSourceOperatorTest {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
+	@SuppressWarnings("unchecked")
+	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
+			TimeCharacteristic timeChar,
+			long watermarkInterval,
+			long latencyMarkInterval) {
+		setupSourceOperator(operator, timeChar, watermarkInterval, latencyMarkInterval, new TestProcessingTimeService());
+	}
+
 	@SuppressWarnings("unchecked")
 	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
 												TimeCharacteristic timeChar,
 												long watermarkInterval,
 												long latencyMarkInterval,
-												final TimeServiceProvider timeProvider) {
+												final ProcessingTimeService timeProvider) {
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setAutoWatermarkInterval(watermarkInterval);
@@ -284,12 +292,15 @@ public class StreamSourceOperatorTest {
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
 
-		doAnswer(new Answer<TimeServiceProvider>() {
+		doAnswer(new Answer<ProcessingTimeService>() {
 			@Override
-			public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
+			public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+				if (timeProvider == null) {
+					throw new RuntimeException("The time provider is null.");
+				}
 				return timeProvider;
 			}
-		}).when(mockTask).getTimerService();
+		}).when(mockTask).getProcessingTimeService();
 
 		operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 98058e8..fb1fab5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -64,7 +64,7 @@ public class StreamTaskTimerTest {
 		testHarness.waitForTaskRunning();
 
 		// first one spawns thread
-		mapTask.getTimerService().registerTimer(System.currentTimeMillis(), new Triggerable() {
+		mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new Triggerable() {
 			@Override
 			public void trigger(long timestamp) {
 			}
@@ -106,7 +106,7 @@ public class StreamTaskTimerTest {
 			final long t3 = System.currentTimeMillis() + 100;
 			final long t4 = System.currentTimeMillis() + 200;
 
-			TimeServiceProvider timeService = mapTask.getTimerService();
+			ProcessingTimeService timeService = mapTask.getProcessingTimeService();
 			timeService.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
 			timeService.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
 			timeService.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
new file mode 100644
index 0000000..9c2cee3
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TestProcessingTimeServiceTest {
+
+	@Test
+	public void testCustomTimeServiceProvider() throws Throwable {
+		TestProcessingTimeService tp = new TestProcessingTimeService();
+
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+		mapTask.setProcessingTimeService(tp);
+
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+
+		StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
+		streamConfig.setStreamOperator(mapOperator);
+
+		testHarness.invoke();
+
+		assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 0);
+
+		tp.setCurrentTime(11);
+		assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 11);
+
+		tp.setCurrentTime(15);
+		tp.setCurrentTime(16);
+		assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 16);
+
+		// register 2 tasks
+		mapTask.getProcessingTimeService().registerTimer(30, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		mapTask.getProcessingTimeService().registerTimer(40, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		assertEquals(2, tp.getNumRegisteredTimers());
+
+		tp.setCurrentTime(35);
+		assertEquals(1, tp.getNumRegisteredTimers());
+
+		tp.setCurrentTime(40);
+		assertEquals(0, tp.getNumRegisteredTimers());
+
+		tp.shutdownService();
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
+
+		private final AtomicReference<Throwable> errorReference;
+
+		public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
+			this.errorReference = errorReference;
+		}
+
+		@Override
+		public void handleAsyncException(String message, Throwable exception) {
+			errorReference.compareAndSet(null, exception);
+		}
+	}
+}