You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:19 UTC

[04/11] flink git commit: [FLINK-4877] Rename TimeServiceProvider to ProcessingTimeService

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
deleted file mode 100644
index a8f2dc4..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-public class TestTimeProviderTest {
-
-	@Test
-	public void testCustomTimeServiceProvider() throws Throwable {
-		TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
-		mapTask.setTimeService(tp);
-
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
-			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-
-		StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
-		streamConfig.setStreamOperator(mapOperator);
-
-		testHarness.invoke();
-
-		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
-
-		tp.setCurrentTime(11);
-		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
-
-		tp.setCurrentTime(15);
-		tp.setCurrentTime(16);
-		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
-
-		// register 2 tasks
-		mapTask.getTimerService().registerTimer(30, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-
-			}
-		});
-
-		mapTask.getTimerService().registerTimer(40, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-
-			}
-		});
-
-		assertEquals(2, tp.getNumRegisteredTimers());
-
-		tp.setCurrentTime(35);
-		assertEquals(1, tp.getNumRegisteredTimers());
-
-		tp.setCurrentTime(40);
-		assertEquals(0, tp.getNumRegisteredTimers());
-
-		tp.shutdownService();
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
-
-		private final AtomicReference<Throwable> errorReference;
-
-		public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
-			this.errorReference = errorReference;
-		}
-
-		@Override
-		public void handleAsyncException(String message, Throwable exception) {
-			errorReference.compareAndSet(null, exception);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index 4d5c881..af99d0d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Test;
@@ -41,9 +42,13 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 
 		final ExecutionConfig config = new ExecutionConfig();
 		config.setAutoWatermarkInterval(50);
+
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		
 		OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-				new OneInputStreamOperatorTestHarness<Long, Long>(operator, config);
+				new OneInputStreamOperatorTestHarness<Long, Long>(operator, config, processingTimeService);
+
+		long currentTime = 0;
 
 		testHarness.open();
 		
@@ -71,7 +76,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 					// check the invariant
 					assertTrue(lastWatermark < nextElementValue);
 				} else {
-					Thread.sleep(10);
+					currentTime = currentTime + 10;
+					processingTimeService.setCurrentTime(currentTime);
 				}
 			}
 			
@@ -102,7 +108,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 					// check the invariant
 					assertTrue(lastWatermark < nextElementValue);
 				} else {
-					Thread.sleep(10);
+					currentTime = currentTime + 10;
+					processingTimeService.setCurrentTime(currentTime);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index e96109e..128c88b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,12 +39,12 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
@@ -186,8 +186,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final TimeServiceProvider timerService = new NoOpTimerService();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
+			final ProcessingTimeService timerService = new NoOpTimerService();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
 
@@ -234,13 +234,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -299,12 +299,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 			
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -371,12 +371,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -438,12 +438,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 		
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 			new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -503,7 +503,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 							windowSize, windowSize);
 
-			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
@@ -542,7 +542,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 							windowSize, windowSize);
 
-			timerService = new TestTimeServiceProvider();
+			timerService = new TestProcessingTimeService();
 			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
@@ -583,7 +583,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			// sliding window (200 msecs) every 50 msecs
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -631,7 +631,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 					windowSize, windowSlide);
 
-			timerService = new TestTimeServiceProvider();
+			timerService = new TestProcessingTimeService();
 			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
@@ -684,7 +684,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							new StatefulFunction(), identitySelector,
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
 
-			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 					new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
@@ -777,7 +777,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 
-	private static StreamTask<?, ?> createMockTask(Object lock) {
+	private static StreamTask<?, ?> createMockTask() {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
 
@@ -785,7 +785,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
 		when(task.getName()).thenReturn("Test task name");
 		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(task.getCheckpointLock()).thenReturn(lock);
 
 		final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
 		when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -801,10 +800,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	private static StreamTask<?, ?> createMockTaskWithTimer(
-		final TimeServiceProvider timerService, final Object lock)
+		final ProcessingTimeService timerService)
 	{
-		StreamTask<?, ?> mockTask = createMockTask(lock);
-		when(mockTask.getTimerService()).thenReturn(timerService);
+		StreamTask<?, ?> mockTask = createMockTask();
+		when(mockTask.getProcessingTimeService()).thenReturn(timerService);
 		return mockTask;
 	}
 
@@ -819,7 +818,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		return result;
 	}
 
-	private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+	private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception {
 		timers.shutdownService();
 
 		while (!timers.isTerminated()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 802329b..bb64a08 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,12 +40,13 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
@@ -194,8 +195,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final TimeServiceProvider timerService = new NoOpTimerService();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
+			final ProcessingTimeService timerService = new NoOpTimerService();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			AggregatingProcessingTimeWindowOperator<String, String> op;
 
@@ -242,7 +243,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
@@ -255,7 +256,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
 
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
 			op.open();
@@ -311,13 +312,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
 
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -389,13 +390,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
 
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -471,12 +472,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -541,12 +542,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+		final ProcessingTimeService timerService = new SystemProcessingTimeService(
 				new ReferenceSettingExceptionHandler(error), lock);
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
 
@@ -605,7 +606,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			final int windowSize = 200;
 
-			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			// tumbling window that triggers every 50 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -655,7 +656,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSize);
 
-			timerService = new TestTimeServiceProvider();
+			timerService = new TestProcessingTimeService();
 			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
@@ -698,7 +699,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			// sliding window (200 msecs) every 50 msecs
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -748,7 +749,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSlide);
 
-			timerService = new TestTimeServiceProvider();
+			timerService = new TestProcessingTimeService();
 			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
@@ -796,7 +797,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			final long twoSeconds = 2000;
 			
-			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			StatefulFunction.globalCounts.clear();
 			
@@ -850,7 +851,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+			TestProcessingTimeService timerService = new TestProcessingTimeService();
 
 			StatefulFunction.globalCounts.clear();
 			
@@ -977,7 +978,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 	
-	private static StreamTask<?, ?> createMockTask(Object lock) {
+	private static StreamTask<?, ?> createMockTask() {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
 
@@ -985,7 +986,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
 		when(task.getName()).thenReturn("Test task name");
 		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(task.getCheckpointLock()).thenReturn(lock);
 
 		final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
 		when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -996,10 +996,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		return task;
 	}
 
-	private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService, final Object lock)
+	private static StreamTask<?, ?> createMockTaskWithTimer(final ProcessingTimeService timerService)
 	{
-		StreamTask<?, ?> mockTask = createMockTask(lock);
-		when(mockTask.getTimerService()).thenReturn(timerService);
+		StreamTask<?, ?> mockTask = createMockTask();
+		when(mockTask.getProcessingTimeService()).thenReturn(timerService);
 		return mockTask;
 	}
 
@@ -1018,7 +1018,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		return result;
 	}
 
-	private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+	private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception {
 		timers.shutdownService();
 
 		while (!timers.isTerminated()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
index d0c5050..a7a71cf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -19,11 +19,11 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import java.util.concurrent.ScheduledFuture;
 
-class NoOpTimerService extends TimeServiceProvider {
+class NoOpTimerService extends ProcessingTimeService {
 
 	private volatile boolean terminated;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2b0b915..38f0778 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -850,8 +850,6 @@ public class WindowOperatorTest extends TestLogger {
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
-		TestTimeServiceProvider timer = new TestTimeServiceProvider();
-
 		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
 				new SumReducer(),
 				inputType.createSerializer(new ExecutionConfig()));
@@ -869,7 +867,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
-						operator, new ExecutionConfig(), timer,
+						operator, new ExecutionConfig(),
 						new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		
 		testHarness.open();
@@ -898,7 +896,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(
-						otherOperator, new ExecutionConfig(), timer,
+						otherOperator, new ExecutionConfig(),
 						new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		otherTestHarness.setup();
@@ -928,7 +926,7 @@ public class WindowOperatorTest extends TestLogger {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(), 0);
 
-		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+		TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -987,7 +985,7 @@ public class WindowOperatorTest extends TestLogger {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(), 0);
 
-		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+		TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1059,7 +1057,7 @@ public class WindowOperatorTest extends TestLogger {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(), 0);
 
-		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+		TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
deleted file mode 100644
index 29e13ed..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import org.junit.Test;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class DefaultTimeServiceProviderTest {
-
-	@Test
-	public void testTriggerHoldsLock() throws Exception {
-
-		final Object lock = new Object();
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
-				new ReferenceSettingExceptionHandler(errorRef), lock);
-
-		try {
-			assertEquals(0, timer.getNumTasksScheduled());
-
-			// schedule something
-			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
-				@Override
-				public void trigger(long timestamp) {
-					assertTrue(Thread.holdsLock(lock));
-				}
-			});
-
-			// wait until the execution is over
-			future.get();
-			assertEquals(0, timer.getNumTasksScheduled());
-
-			// check that no asynchronous error was reported
-			if (errorRef.get() != null) {
-				throw new Exception(errorRef.get());
-			}
-		}
-		finally {
-			timer.shutdownService();
-		}
-	}
-
-	@Test
-	public void testImmediateShutdown() throws Exception {
-
-		final Object lock = new Object();
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
-				new ReferenceSettingExceptionHandler(errorRef), lock);
-
-		try {
-			assertFalse(timer.isTerminated());
-
-			final OneShotLatch latch = new OneShotLatch();
-
-			// the task should trigger immediately and should block until terminated with interruption
-			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
-				@Override
-				public void trigger(long timestamp) throws Exception {
-					latch.trigger();
-					Thread.sleep(100000000);
-				}
-			});
-
-			latch.await();
-			timer.shutdownService();
-
-			// can only enter this scope after the triggerable is interrupted
-			//noinspection SynchronizationOnLocalVariableOrMethodParameter
-			synchronized (lock) {
-				assertTrue(timer.isTerminated());
-			}
-
-			try {
-				timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() {
-					@Override
-					public void trigger(long timestamp) {}
-				});
-
-				fail("should result in an exception");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
-
-			// obviously, we have an asynchronous interrupted exception
-			assertNotNull(errorRef.get());
-			assertTrue(errorRef.get().getCause() instanceof InterruptedException);
-
-			assertEquals(0, timer.getNumTasksScheduled());
-		}
-		finally {
-			timer.shutdownService();
-		}
-	}
-
-	@Test
-	public void testQuiescing() throws Exception {
-
-		final Object lock = new Object();
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
-				new ReferenceSettingExceptionHandler(errorRef), lock);
-
-		try {
-			final OneShotLatch latch = new OneShotLatch();
-
-			final ReentrantLock scopeLock = new ReentrantLock();
-
-			timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() {
-				@Override
-				public void trigger(long timestamp) throws Exception {
-					scopeLock.lock();
-					try {
-						latch.trigger();
-						// delay a bit before leaving the method
-						Thread.sleep(5);
-					} finally {
-						scopeLock.unlock();
-					}
-				}
-			});
-
-			// after the task triggered, shut the timer down cleanly, waiting for the task to finish
-			latch.await();
-			timer.quiesceAndAwaitPending();
-
-			// should be able to immediately acquire the lock, since the task must have exited by now 
-			assertTrue(scopeLock.tryLock());
-
-			// should be able to schedule more tasks (that never get executed)
-			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
-				@Override
-				public void trigger(long timestamp) throws Exception {
-					throw new Exception("test");
-				}
-			});
-			assertNotNull(future);
-
-			// nothing should be scheduled right now
-			assertEquals(0, timer.getNumTasksScheduled());
-
-			// check that no asynchronous error was reported - that ensures that the newly scheduled 
-			// triggerable did, in fact, not trigger
-			if (errorRef.get() != null) {
-				throw new Exception(errorRef.get());
-			}
-		}
-		finally {
-			timer.shutdownService();
-		}
-	}
-
-	@Test
-	public void testFutureCancellation() throws Exception {
-
-		final Object lock = new Object();
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
-				new ReferenceSettingExceptionHandler(errorRef), lock);
-
-		try {
-			assertEquals(0, timer.getNumTasksScheduled());
-
-			// schedule something
-			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
-				@Override
-				public void trigger(long timestamp) {}
-			});
-			assertEquals(1, timer.getNumTasksScheduled());
-
-			future.cancel(false);
-
-			assertEquals(0, timer.getNumTasksScheduled());
-
-			// check that no asynchronous error was reported
-			if (errorRef.get() != null) {
-				throw new Exception(errorRef.get());
-			}
-		}
-		finally {
-			timer.shutdownService();
-		}
-	}
-
-	@Test
-	public void testExceptionReporting() throws InterruptedException {
-		final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
-		final OneShotLatch latch = new OneShotLatch();
-		final Object lock = new Object();
-
-		TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
-				new AsyncExceptionHandler() {
-					@Override
-					public void handleAsyncException(String message, Throwable exception) {
-						exceptionWasThrown.set(true);
-						latch.trigger();
-					}
-				}, lock);
-		
-		timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() {
-			@Override
-			public void trigger(long timestamp) throws Exception {
-				throw new Exception("Exception in Timer");
-			}
-		});
-
-		latch.await();
-		assertTrue(exceptionWasThrown.get());
-	}
-
-	@Test
-	public void testTimerSorting() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
-				new ReferenceSettingExceptionHandler(errorRef), lock);
-
-		try {
-			final OneShotLatch sync = new OneShotLatch();
-
-			// we block the timer execution to make sure we have all the time
-			// to register some additional timers out of order
-			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
-				@Override
-				public void trigger(long timestamp) throws Exception {
-					sync.await();
-				}
-			});
-			
-			// schedule two timers out of order something
-			final long now = System.currentTimeMillis();
-			final long time1 = now + 6;
-			final long time2 = now + 5;
-			final long time3 = now + 8;
-			final long time4 = now - 2;
-
-			final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
-			Triggerable trigger = new Triggerable() {
-				@Override
-				public void trigger(long timestamp) {
-					timestamps.add(timestamp);
-				}
-			};
-
-			// schedule
-			ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger);
-			ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger);
-			ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger);
-			ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger);
-
-			// now that everything is scheduled, unblock the timer service
-			sync.trigger();
-
-			// wait until both are complete
-			future1.get();
-			future2.get();
-			future3.get();
-			future4.get();
-
-			// verify that the order is 4 - 2 - 1 - 3
-			assertEquals(4, timestamps.size());
-			assertEquals(time4, timestamps.take().longValue());
-			assertEquals(time2, timestamps.take().longValue());
-			assertEquals(time1, timestamps.take().longValue());
-			assertEquals(time3, timestamps.take().longValue());
-
-			// check that no asynchronous error was reported
-			if (errorRef.get() != null) {
-				throw new Exception(errorRef.get());
-			}
-		}
-		finally {
-			timer.shutdownService();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index ce62624..ab7bf69 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -113,11 +113,11 @@ public class StreamTaskTestHarness<OUT> {
 		outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer);
 	}
 
-	public TimeServiceProvider getTimerService() {
+	public ProcessingTimeService getProcessingTimeService() {
 		if (!(task instanceof StreamTask)) {
-			throw new UnsupportedOperationException("getTimerService() only supported on StreamTasks.");
+			throw new UnsupportedOperationException("getProcessingTimeService() only supported on StreamTasks.");
 		}
-		return ((StreamTask) task).getTimerService();
+		return ((StreamTask) task).getProcessingTimeService();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
new file mode 100644
index 0000000..e7944df
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import org.junit.Test;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SystemProcessingTimeServiceTest {
+
+	@Test
+	public void testTriggerHoldsLock() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// schedule something
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+				@Override
+				public void trigger(long timestamp) {
+					assertTrue(Thread.holdsLock(lock));
+				}
+			});
+
+			// wait until the execution is over
+			future.get();
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
+	public void testImmediateShutdown() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			assertFalse(timer.isTerminated());
+
+			final OneShotLatch latch = new OneShotLatch();
+
+			// the task should trigger immediately and should block until terminated with interruption
+			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					latch.trigger();
+					Thread.sleep(100000000);
+				}
+			});
+
+			latch.await();
+			timer.shutdownService();
+
+			// can only enter this scope after the triggerable is interrupted
+			//noinspection SynchronizationOnLocalVariableOrMethodParameter
+			synchronized (lock) {
+				assertTrue(timer.isTerminated());
+			}
+
+			try {
+				timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() {
+					@Override
+					public void trigger(long timestamp) {}
+				});
+
+				fail("should result in an exception");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+
+			// obviously, we have an asynchronous interrupted exception
+			assertNotNull(errorRef.get());
+			assertTrue(errorRef.get().getCause() instanceof InterruptedException);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
+	public void testQuiescing() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			final OneShotLatch latch = new OneShotLatch();
+
+			final ReentrantLock scopeLock = new ReentrantLock();
+
+			timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					scopeLock.lock();
+					try {
+						latch.trigger();
+						// delay a bit before leaving the method
+						Thread.sleep(5);
+					} finally {
+						scopeLock.unlock();
+					}
+				}
+			});
+
+			// after the task triggered, shut the timer down cleanly, waiting for the task to finish
+			latch.await();
+			timer.quiesceAndAwaitPending();
+
+			// should be able to immediately acquire the lock, since the task must have exited by now 
+			assertTrue(scopeLock.tryLock());
+
+			// should be able to schedule more tasks (that never get executed)
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					throw new Exception("test");
+				}
+			});
+			assertNotNull(future);
+
+			// nothing should be scheduled right now
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// check that no asynchronous error was reported - that ensures that the newly scheduled 
+			// triggerable did, in fact, not trigger
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
+	public void testFutureCancellation() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// schedule something
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
+				@Override
+				public void trigger(long timestamp) {}
+			});
+			assertEquals(1, timer.getNumTasksScheduled());
+
+			future.cancel(false);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
+	public void testExceptionReporting() throws InterruptedException {
+		final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
+		final OneShotLatch latch = new OneShotLatch();
+		final Object lock = new Object();
+
+		ProcessingTimeService timeServiceProvider = new SystemProcessingTimeService(
+				new AsyncExceptionHandler() {
+					@Override
+					public void handleAsyncException(String message, Throwable exception) {
+						exceptionWasThrown.set(true);
+						latch.trigger();
+					}
+				}, lock);
+		
+		timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() {
+			@Override
+			public void trigger(long timestamp) throws Exception {
+				throw new Exception("Exception in Timer");
+			}
+		});
+
+		latch.await();
+		assertTrue(exceptionWasThrown.get());
+	}
+
+	@Test
+	public void testTimerSorting() throws Exception {
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			final OneShotLatch sync = new OneShotLatch();
+
+			// we block the timer execution to make sure we have all the time
+			// to register some additional timers out of order
+			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					sync.await();
+				}
+			});
+			
+			// schedule two timers out of order something
+			final long now = System.currentTimeMillis();
+			final long time1 = now + 6;
+			final long time2 = now + 5;
+			final long time3 = now + 8;
+			final long time4 = now - 2;
+
+			final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
+			Triggerable trigger = new Triggerable() {
+				@Override
+				public void trigger(long timestamp) {
+					timestamps.add(timestamp);
+				}
+			};
+
+			// schedule
+			ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger);
+			ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger);
+			ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger);
+			ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger);
+
+			// now that everything is scheduled, unblock the timer service
+			sync.trigger();
+
+			// wait until both are complete
+			future1.get();
+			future2.get();
+			future3.get();
+			future4.get();
+
+			// verify that the order is 4 - 2 - 1 - 3
+			assertEquals(4, timestamps.size());
+			assertEquals(time4, timestamps.take().longValue());
+			assertEquals(time2, timestamps.take().longValue());
+			assertEquals(time1, timestamps.take().longValue());
+			assertEquals(time3, timestamps.take().longValue());
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 41968e6..6ad684b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -50,7 +50,7 @@ import static org.mockito.Mockito.doAnswer;
 
 /**
  * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get
- * a {@link AbstractKeyedStateBackend}.
+ * a {@link KeyedStateBackend}.
  *
  */
 public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@@ -94,7 +94,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 
 	public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig,
-			TestTimeServiceProvider testTimeProvider,
+			ProcessingTimeService testTimeProvider,
 			KeySelector<IN, K> keySelector,
 			TypeInformation<K> keyType) {
 		super(operator, executionConfig, testTimeProvider);
@@ -187,7 +187,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	}
 
 	/**
-	 *
+	 * 
 	 */
 	@Override
 	public void restore(StreamStateHandle snapshot) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 2dd2163..c2763d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -19,24 +19,14 @@ package org.apache.flink.streaming.util;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class MockContext<IN, OUT> {
 	
@@ -95,17 +85,4 @@ public class MockContext<IN, OUT> {
 
 		return result;
 	}
-
-	private static StreamTask<?, ?> createMockTaskWithTimer(
-		final TimeServiceProvider timerService, final Object lock)
-	{
-		StreamTask<?, ?> task = mock(StreamTask.class);
-		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
-		when(task.getName()).thenReturn("Test task name");
-		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024));
-		when(task.getCheckpointLock()).thenReturn(lock);
-		when(task.getTimerService()).thenReturn(timerService);
-		return task;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 9f8d223..4104049 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -43,11 +43,11 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -79,9 +79,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
 	final ExecutionConfig executionConfig;
 
-	final Object checkpointLock;
-
-	final TimeServiceProvider timeServiceProvider;
+	final ProcessingTimeService processingTimeService;
 
 	StreamTask<?, ?> mockTask;
 
@@ -105,36 +103,36 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig) {
-		this(operator, executionConfig, null);
+		this(operator, executionConfig, new TestProcessingTimeService());
 	}
 
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig,
-			TestTimeServiceProvider testTimeProvider) {
-		this(operator, executionConfig, new Object(), testTimeProvider);
+			ProcessingTimeService processingTimeService) {
+		this(operator, executionConfig, new Object(), processingTimeService);
 	}
 
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig,
 			Object checkpointLock,
-			TimeServiceProvider testTimeProvider) {
+			ProcessingTimeService processingTimeService) {
 
+		this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
 		this.operator = operator;
-		this.outputList = new ConcurrentLinkedQueue<Object>();
+		this.outputList = new ConcurrentLinkedQueue<>();
 		Configuration underlyingConfig = new Configuration();
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
 		this.executionConfig = executionConfig;
-		this.checkpointLock = checkpointLock;
 		this.closableRegistry = new ClosableRegistry();
 
 		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
 		mockTask = mock(StreamTask.class);
 
 		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
+		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(config);
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(env);
@@ -183,15 +181,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 			throw new RuntimeException(e.getMessage(), e);
 		}
 
-		timeServiceProvider = testTimeProvider != null ? testTimeProvider :
-			new DefaultTimeServiceProvider(mockTask, this.checkpointLock);
-
-		doAnswer(new Answer<TimeServiceProvider>() {
+		doAnswer(new Answer<ProcessingTimeService>() {
 			@Override
-			public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
-				return timeServiceProvider;
+			public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+				return OneInputStreamOperatorTestHarness.this.processingTimeService;
 			}
-		}).when(mockTask).getTimerService();
+		}).when(mockTask).getProcessingTimeService();
 	}
 
 	public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
@@ -219,9 +214,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
-	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
-	 * to extract only the StreamRecords.
+	 * Get all the output from the task. This contains StreamRecords and Events interleaved.
 	 */
 	public ConcurrentLinkedQueue<Object> getOutput() {
 		return outputList;
@@ -316,8 +309,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	public void close() throws Exception {
 		operator.close();
 		operator.dispose();
-		if (timeServiceProvider != null) {
-			timeServiceProvider.shutdownService();
+		if (processingTimeService != null) {
+			processingTimeService.shutdownService();
 		}
 		setupCalled = false;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index a4e26f0..ed9a7cd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
@@ -50,7 +50,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class WindowingTestHarness<K, IN, W extends Window> {
 
-	private final TestTimeServiceProvider timeServiceProvider;
+	private final TestProcessingTimeService timeServiceProvider;
 
 	private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
 
@@ -80,7 +80,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 				trigger,
 				allowedLateness);
 
-		timeServiceProvider = new TestTimeServiceProvider();
+		timeServiceProvider = new TestProcessingTimeService();
 		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index 707ce0f..e7f62fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -192,7 +192,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+				getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -209,7 +209,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			try {
 				numTimers++;
 				throwIfDone();
-				getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
+				getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
 			} finally {
 				semaphore.release();
 			}
@@ -251,7 +251,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+				getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -266,7 +266,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+				getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -284,7 +284,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			try {
 				numTimers++;
 				throwIfDone();
-				getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
+				getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
 			} finally {
 				semaphore.release();
 			}