You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:17 UTC

[02/11] flink git commit: [FLINK-4877] Refactor OperatorTestHarness to always use TestProcessingTimeService

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 38f0778..b8a764e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -63,7 +63,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -843,71 +842,6 @@ public class WindowOperatorTest extends TestLogger {
 	}
 
 	@Test
-	public void testRestoreAndSnapshotAreInSync() throws Exception {
-
-		final int WINDOW_SIZE = 3;
-		final int WINDOW_SLIDE = 1;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-
-		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-				new SumReducer(),
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				EventTimeTrigger.create(),
-				0);
-
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(
-						operator, new ExecutionConfig(),
-						new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-		
-		testHarness.open();
-
-		WindowOperator.Timer<String, TimeWindow> timer1 = new WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L));
-		WindowOperator.Timer<String, TimeWindow> timer2 = new WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L));
-		WindowOperator.Timer<String, TimeWindow> timer3 = new WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L));
-		operator.processingTimeTimers.add(timer1);
-		operator.processingTimeTimers.add(timer2);
-		operator.processingTimeTimers.add(timer3);
-		operator.processingTimeTimersQueue.add(timer1);
-		operator.processingTimeTimersQueue.add(timer2);
-		operator.processingTimeTimersQueue.add(timer3);
-		
-		StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0);
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new WindowOperator<>(
-				SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				EventTimeTrigger.create(),
-				0);
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(
-						otherOperator, new ExecutionConfig(),
-						new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		otherTestHarness.setup();
-		otherTestHarness.restore(snapshot);
-		otherTestHarness.open();
-
-		Assert.assertEquals(operator.processingTimeTimers, otherOperator.processingTimeTimers);
-		Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), otherOperator.processingTimeTimersQueue.toArray());
-	}
-
-	@Test
 	public void testProcessingTimeTumblingWindows() throws Throwable {
 		final int WINDOW_SIZE = 3;
 
@@ -926,16 +860,14 @@ public class WindowOperatorTest extends TestLogger {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(), 0);
 
-		TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
-		testTimeProvider.setCurrentTime(3);
+		testHarness.setProcessingTime(3);
 
 		// timestamp is ignored in processing time
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
@@ -945,7 +877,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
 
-		testTimeProvider.setCurrentTime(5000);
+		testHarness.setProcessingTime(5000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
@@ -956,7 +888,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
 
-		testTimeProvider.setCurrentTime(7000);
+		testHarness.setProcessingTime(7000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
 
@@ -985,20 +917,18 @@ public class WindowOperatorTest extends TestLogger {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(), 0);
 
-		TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// timestamp is ignored in processing time
-		testTimeProvider.setCurrentTime(3);
+		testHarness.setProcessingTime(3);
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
 
-		testTimeProvider.setCurrentTime(1000);
+		testHarness.setProcessingTime(1000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
 
@@ -1007,7 +937,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
 
-		testTimeProvider.setCurrentTime(2000);
+		testHarness.setProcessingTime(2000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
@@ -1015,7 +945,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 
-		testTimeProvider.setCurrentTime(3000);
+		testHarness.setProcessingTime(3000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
@@ -1026,7 +956,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 
-		testTimeProvider.setCurrentTime(7000);
+		testHarness.setProcessingTime(7000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 3999));
@@ -1057,23 +987,21 @@ public class WindowOperatorTest extends TestLogger {
 				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				ProcessingTimeTrigger.create(), 0);
 
-		TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// timestamp is ignored in processing time
-		testTimeProvider.setCurrentTime(3);
+		testHarness.setProcessingTime(3);
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1));//Long.MAX_VALUE));
 
-		testTimeProvider.setCurrentTime(1000);
+		testHarness.setProcessingTime(1000);
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE));
 
-		testTimeProvider.setCurrentTime(5000);
+		testHarness.setProcessingTime(5000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
 
@@ -1085,7 +1013,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
 
-		testTimeProvider.setCurrentTime(10000);
+		testHarness.setProcessingTime(10000);
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 7999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 7999));
@@ -2042,6 +1970,8 @@ public class WindowOperatorTest extends TestLogger {
 				"window-contents",
 				new Tuple2<>((String) null, 0),
 				new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
 					@Override
 					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
 						return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
@@ -2183,6 +2113,8 @@ public class WindowOperatorTest extends TestLogger {
 				"window-contents",
 				new Tuple2<>((String) null, 0),
 				new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
 					@Override
 					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
 						return new Tuple2<>(value.f0, accumulator.f1 + value.f1);

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 6ad684b..9c9d11b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.mockito.invocation.InvocationOnMock;
@@ -67,7 +66,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	public KeyedOneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			final KeySelector<IN, K> keySelector,
-			TypeInformation<K> keyType) {
+			TypeInformation<K> keyType) throws Exception {
 		super(operator);
 
 		ClosureCleaner.clean(keySelector, false);
@@ -81,7 +80,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig,
 			KeySelector<IN, K> keySelector,
-			TypeInformation<K> keyType) {
+			TypeInformation<K> keyType) throws Exception {
 		super(operator, executionConfig);
 
 		ClosureCleaner.clean(keySelector, false);
@@ -92,21 +91,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		setupMockTaskCreateKeyedBackend();
 	}
 
-	public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
-			ExecutionConfig executionConfig,
-			ProcessingTimeService testTimeProvider,
-			KeySelector<IN, K> keySelector,
-			TypeInformation<K> keyType) {
-		super(operator, executionConfig, testTimeProvider);
-
-		ClosureCleaner.clean(keySelector, false);
-		config.setStatePartitioner(0, keySelector);
-		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-		config.setNumberOfKeyGroups(MAX_PARALLELISM);
-
-		setupMockTaskCreateKeyedBackend();
-	}
-
 	private void setupMockTaskCreateKeyedBackend() {
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 4104049..8041a7c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -52,6 +52,8 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.mockito.Matchers.any;
@@ -79,7 +81,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
 	final ExecutionConfig executionConfig;
 
-	final ProcessingTimeService processingTimeService;
+	final TestProcessingTimeService processingTimeService;
 
 	StreamTask<?, ?> mockTask;
 
@@ -96,30 +98,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
 	private volatile boolean wasFailedExternally = false;
 
-	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
+	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
 		this(operator, new ExecutionConfig());
 	}
 
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
-			ExecutionConfig executionConfig) {
-		this(operator, executionConfig, new TestProcessingTimeService());
-	}
-
-	public OneInputStreamOperatorTestHarness(
-			OneInputStreamOperator<IN, OUT> operator,
-			ExecutionConfig executionConfig,
-			ProcessingTimeService processingTimeService) {
-		this(operator, executionConfig, new Object(), processingTimeService);
-	}
-
-	public OneInputStreamOperatorTestHarness(
-			OneInputStreamOperator<IN, OUT> operator,
-			ExecutionConfig executionConfig,
-			Object checkpointLock,
-			ProcessingTimeService processingTimeService) {
-
-		this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
+			ExecutionConfig executionConfig) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
 		Configuration underlyingConfig = new Configuration();
@@ -130,9 +115,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
 		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
 		mockTask = mock(StreamTask.class);
+		processingTimeService = new TestProcessingTimeService();
+		processingTimeService.setCurrentTime(0);
 
 		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+		when(mockTask.getCheckpointLock()).thenReturn(new Object());
 		when(mockTask.getConfiguration()).thenReturn(config);
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(env);
@@ -184,7 +171,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		doAnswer(new Answer<ProcessingTimeService>() {
 			@Override
 			public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
-				return OneInputStreamOperatorTestHarness.this.processingTimeService;
+				return processingTimeService;
 			}
 		}).when(mockTask).getProcessingTimeService();
 	}
@@ -221,6 +208,21 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
+	 * Get all the output from the task and clear the output buffer.
+	 * This contains only StreamRecords.
+	 */
+	@SuppressWarnings("unchecked")
+	public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
+		List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>();
+		for (Object e: getOutput()) {
+			if (e instanceof StreamRecord) {
+				resultElements.add((StreamRecord<OUT>) e);
+			}
+		}
+		return resultElements;
+	}
+
+	/**
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
 	 */
 	public void setup() throws Exception {
@@ -327,6 +329,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		}
 	}
 
+	public void setProcessingTime(long time) throws Exception {
+		processingTimeService.setCurrentTime(time);
+	}
+
 	public void processWatermark(Watermark mark) throws Exception {
 		operator.processWatermark(mark);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index ed9a7cd..9a1b512 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
@@ -50,8 +49,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class WindowingTestHarness<K, IN, W extends Window> {
 
-	private final TestProcessingTimeService timeServiceProvider;
-
 	private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
 
 	private final ConcurrentLinkedQueue<Object> expectedOutputs = new ConcurrentLinkedQueue<>();
@@ -64,7 +61,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 								TypeInformation<IN> inputType,
 								KeySelector<IN, K> keySelector,
 								Trigger<? super IN, ? super W> trigger,
-								long allowedLateness) {
+								long allowedLateness) throws Exception {
 
 		ListStateDescriptor<IN> windowStateDesc =
 				new ListStateDescriptor<>("window-contents", inputType.createSerializer(executionConfig));
@@ -80,8 +77,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 				trigger,
 				allowedLateness);
 
-		timeServiceProvider = new TestProcessingTimeService();
-		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType);
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, keySelector, keyType);
 	}
 
 	/**
@@ -106,7 +102,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 	 */
 	public void setProcessingTime(long timestamp) throws Exception {
 		openOperator();
-		timeServiceProvider.setCurrentTime(timestamp);
+		testHarness.setProcessingTime(timestamp);
 	}
 
 	/**