You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:17 UTC
[02/11] flink git commit: [FLINK-4877] Refactor OperatorTestHarness
to always use TestProcessingTimeService
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 38f0778..b8a764e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -63,7 +63,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -843,71 +842,6 @@ public class WindowOperatorTest extends TestLogger {
}
@Test
- public void testRestoreAndSnapshotAreInSync() throws Exception {
-
- final int WINDOW_SIZE = 3;
- final int WINDOW_SLIDE = 1;
-
- TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-
- ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
- new SumReducer(),
- inputType.createSerializer(new ExecutionConfig()));
-
- WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
- new TimeWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- stateDesc,
- new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
- EventTimeTrigger.create(),
- 0);
-
-
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(
- operator, new ExecutionConfig(),
- new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.open();
-
- WindowOperator.Timer<String, TimeWindow> timer1 = new WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L));
- WindowOperator.Timer<String, TimeWindow> timer2 = new WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L));
- WindowOperator.Timer<String, TimeWindow> timer3 = new WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L));
- operator.processingTimeTimers.add(timer1);
- operator.processingTimeTimers.add(timer2);
- operator.processingTimeTimers.add(timer3);
- operator.processingTimeTimersQueue.add(timer1);
- operator.processingTimeTimersQueue.add(timer2);
- operator.processingTimeTimersQueue.add(timer3);
-
- StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0);
-
- WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new WindowOperator<>(
- SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
- new TimeWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- stateDesc,
- new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
- EventTimeTrigger.create(),
- 0);
-
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(
- otherOperator, new ExecutionConfig(),
- new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
- otherTestHarness.setup();
- otherTestHarness.restore(snapshot);
- otherTestHarness.open();
-
- Assert.assertEquals(operator.processingTimeTimers, otherOperator.processingTimeTimers);
- Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), otherOperator.processingTimeTimersQueue.toArray());
- }
-
- @Test
public void testProcessingTimeTumblingWindows() throws Throwable {
final int WINDOW_SIZE = 3;
@@ -926,16 +860,14 @@ public class WindowOperatorTest extends TestLogger {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
- TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
- testTimeProvider.setCurrentTime(3);
+ testHarness.setProcessingTime(3);
// timestamp is ignored in processing time
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
@@ -945,7 +877,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
- testTimeProvider.setCurrentTime(5000);
+ testHarness.setProcessingTime(5000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
@@ -956,7 +888,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
- testTimeProvider.setCurrentTime(7000);
+ testHarness.setProcessingTime(7000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
@@ -985,20 +917,18 @@ public class WindowOperatorTest extends TestLogger {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
- TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// timestamp is ignored in processing time
- testTimeProvider.setCurrentTime(3);
+ testHarness.setProcessingTime(3);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
- testTimeProvider.setCurrentTime(1000);
+ testHarness.setProcessingTime(1000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
@@ -1007,7 +937,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
- testTimeProvider.setCurrentTime(2000);
+ testHarness.setProcessingTime(2000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
@@ -1015,7 +945,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
- testTimeProvider.setCurrentTime(3000);
+ testHarness.setProcessingTime(3000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
@@ -1026,7 +956,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
- testTimeProvider.setCurrentTime(7000);
+ testHarness.setProcessingTime(7000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 3999));
@@ -1057,23 +987,21 @@ public class WindowOperatorTest extends TestLogger {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
- TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// timestamp is ignored in processing time
- testTimeProvider.setCurrentTime(3);
+ testHarness.setProcessingTime(3);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1));//Long.MAX_VALUE));
- testTimeProvider.setCurrentTime(1000);
+ testHarness.setProcessingTime(1000);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE));
- testTimeProvider.setCurrentTime(5000);
+ testHarness.setProcessingTime(5000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
@@ -1085,7 +1013,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
- testTimeProvider.setCurrentTime(10000);
+ testHarness.setProcessingTime(10000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 7999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 7999));
@@ -2042,6 +1970,8 @@ public class WindowOperatorTest extends TestLogger {
"window-contents",
new Tuple2<>((String) null, 0),
new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
@@ -2183,6 +2113,8 @@ public class WindowOperatorTest extends TestLogger {
"window-contents",
new Tuple2<>((String) null, 0),
new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 6ad684b..9c9d11b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.mockito.invocation.InvocationOnMock;
@@ -67,7 +66,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
public KeyedOneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
final KeySelector<IN, K> keySelector,
- TypeInformation<K> keyType) {
+ TypeInformation<K> keyType) throws Exception {
super(operator);
ClosureCleaner.clean(keySelector, false);
@@ -81,7 +80,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig,
KeySelector<IN, K> keySelector,
- TypeInformation<K> keyType) {
+ TypeInformation<K> keyType) throws Exception {
super(operator, executionConfig);
ClosureCleaner.clean(keySelector, false);
@@ -92,21 +91,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
setupMockTaskCreateKeyedBackend();
}
- public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
- ExecutionConfig executionConfig,
- ProcessingTimeService testTimeProvider,
- KeySelector<IN, K> keySelector,
- TypeInformation<K> keyType) {
- super(operator, executionConfig, testTimeProvider);
-
- ClosureCleaner.clean(keySelector, false);
- config.setStatePartitioner(0, keySelector);
- config.setStateKeySerializer(keyType.createSerializer(executionConfig));
- config.setNumberOfKeyGroups(MAX_PARALLELISM);
-
- setupMockTaskCreateKeyedBackend();
- }
-
private void setupMockTaskCreateKeyedBackend() {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 4104049..8041a7c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -52,6 +52,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.mockito.Matchers.any;
@@ -79,7 +81,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final ExecutionConfig executionConfig;
- final ProcessingTimeService processingTimeService;
+ final TestProcessingTimeService processingTimeService;
StreamTask<?, ?> mockTask;
@@ -96,30 +98,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
private volatile boolean wasFailedExternally = false;
- public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
+ public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
this(operator, new ExecutionConfig());
}
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
- ExecutionConfig executionConfig) {
- this(operator, executionConfig, new TestProcessingTimeService());
- }
-
- public OneInputStreamOperatorTestHarness(
- OneInputStreamOperator<IN, OUT> operator,
- ExecutionConfig executionConfig,
- ProcessingTimeService processingTimeService) {
- this(operator, executionConfig, new Object(), processingTimeService);
- }
-
- public OneInputStreamOperatorTestHarness(
- OneInputStreamOperator<IN, OUT> operator,
- ExecutionConfig executionConfig,
- Object checkpointLock,
- ProcessingTimeService processingTimeService) {
-
- this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
+ ExecutionConfig executionConfig) throws Exception {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<>();
Configuration underlyingConfig = new Configuration();
@@ -130,9 +115,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
mockTask = mock(StreamTask.class);
+ processingTimeService = new TestProcessingTimeService();
+ processingTimeService.setCurrentTime(0);
when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+ when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(config);
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
when(mockTask.getEnvironment()).thenReturn(env);
@@ -184,7 +171,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
doAnswer(new Answer<ProcessingTimeService>() {
@Override
public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
- return OneInputStreamOperatorTestHarness.this.processingTimeService;
+ return processingTimeService;
}
}).when(mockTask).getProcessingTimeService();
}
@@ -221,6 +208,21 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
/**
+ * Get all the output from the task and clear the output buffer.
+ * This contains only StreamRecords.
+ */
+ @SuppressWarnings("unchecked")
+ public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
+ List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>();
+ for (Object e: getOutput()) {
+ if (e instanceof StreamRecord) {
+ resultElements.add((StreamRecord<OUT>) e);
+ }
+ }
+ return resultElements;
+ }
+
+ /**
* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
*/
public void setup() throws Exception {
@@ -327,6 +329,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
}
+ public void setProcessingTime(long time) throws Exception {
+ processingTimeService.setCurrentTime(time);
+ }
+
public void processWatermark(Watermark mark) throws Exception {
operator.processWatermark(mark);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index ed9a7cd..9a1b512 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -50,8 +49,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class WindowingTestHarness<K, IN, W extends Window> {
- private final TestProcessingTimeService timeServiceProvider;
-
private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
private final ConcurrentLinkedQueue<Object> expectedOutputs = new ConcurrentLinkedQueue<>();
@@ -64,7 +61,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
TypeInformation<IN> inputType,
KeySelector<IN, K> keySelector,
Trigger<? super IN, ? super W> trigger,
- long allowedLateness) {
+ long allowedLateness) throws Exception {
ListStateDescriptor<IN> windowStateDesc =
new ListStateDescriptor<>("window-contents", inputType.createSerializer(executionConfig));
@@ -80,8 +77,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
trigger,
allowedLateness);
- timeServiceProvider = new TestProcessingTimeService();
- testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType);
+ testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, keySelector, keyType);
}
/**
@@ -106,7 +102,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
*/
public void setProcessingTime(long timestamp) throws Exception {
openOperator();
- timeServiceProvider.setCurrentTime(timestamp);
+ testHarness.setProcessingTime(timestamp);
}
/**