You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:18 UTC
[03/11] flink git commit: [FLINK-4877] Refactor OperatorTestHarness
to always use TestProcessingTimeService
[FLINK-4877] Refactor OperatorTestHarness to always use TestProcessingTimeService
Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30554758
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30554758
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30554758
Branch: refs/heads/master
Commit: 30554758897842ad851dc9b6e1758d452f7d702f
Parents: e112a63
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 28 16:43:40 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 21 19:03:04 2016 +0200
----------------------------------------------------------------------
.../hdfstests/ContinuousFileMonitoringTest.java | 18 +-
.../fs/bucketing/BucketingSinkTest.java | 69 ++-
...stampsAndPeriodicWatermarksOperatorTest.java | 8 +-
...AlignedProcessingTimeWindowOperatorTest.java | 355 ++++++--------
...AlignedProcessingTimeWindowOperatorTest.java | 475 ++++++-------------
.../operators/windowing/CollectingOutput.java | 86 ----
.../operators/windowing/NoOpTimerService.java | 52 --
.../operators/windowing/WindowOperatorTest.java | 106 +----
.../KeyedOneInputStreamOperatorTestHarness.java | 20 +-
.../util/OneInputStreamOperatorTestHarness.java | 50 +-
.../streaming/util/WindowingTestHarness.java | 10 +-
11 files changed, 384 insertions(+), 865 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 971d5f8..56d8efc 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
@@ -127,20 +126,21 @@ public class ContinuousFileMonitoringTest {
ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
reader.setOutputType(typeInfo, executionConfig);
- final TestProcessingTimeService timeServiceProvider = new TestProcessingTimeService();
final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
- new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
+ new OneInputStreamOperatorTestHarness<>(reader, executionConfig);
+
tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
tester.open();
Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
// test that watermarks are correctly emitted
- timeServiceProvider.setCurrentTime(201);
- timeServiceProvider.setCurrentTime(301);
- timeServiceProvider.setCurrentTime(401);
- timeServiceProvider.setCurrentTime(501);
+ tester.setProcessingTime(201);
+ tester.setProcessingTime(301);
+ tester.setProcessingTime(401);
+ tester.setProcessingTime(501);
int i = 0;
for(Object line: tester.getOutput()) {
@@ -170,8 +170,8 @@ public class ContinuousFileMonitoringTest {
for(FileInputSplit split: splits) {
// set the next "current processing time".
- long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
- timeServiceProvider.setCurrentTime(nextTimestamp);
+ long nextTimestamp = tester.getProcessingTime() + watermarkInterval;
+ tester.setProcessingTime(nextTimestamp);
// send the next split to be read and wait until it is fully read.
tester.processElement(new StreamRecord<>(split));
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 0c0111c..f4b3cd7 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -35,8 +35,6 @@ import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
@@ -70,7 +68,7 @@ public class BucketingSinkTest {
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;
- private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TestTimeServiceProvider clock) {
+ private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir) throws Exception {
BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath())
.setBucketer(new Bucketer<String>() {
private static final long serialVersionUID = 1L;
@@ -87,12 +85,12 @@ public class BucketingSinkTest {
.setInactiveBucketThreshold(5*60*1000L)
.setPendingSuffix(".pending");
- return createTestSink(sink, clock);
+ return createTestSink(sink);
}
- private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(BucketingSink<T> sink,
- TestTimeServiceProvider clock) {
- return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig(), clock);
+ private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
+ BucketingSink<T> sink) throws Exception {
+ return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig());
}
@BeforeClass
@@ -121,10 +119,7 @@ public class BucketingSinkTest {
public void testCheckpointWithoutNotify() throws Exception {
File dataDir = tempFolder.newFolder();
- TestTimeServiceProvider clock = new TestTimeServiceProvider();
- clock.setCurrentTime(0L);
-
- OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock);
+ OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir);
testHarness.setup();
testHarness.open();
@@ -133,13 +128,13 @@ public class BucketingSinkTest {
testHarness.processElement(new StreamRecord<>("Hello"));
testHarness.processElement(new StreamRecord<>("Hello"));
- clock.setCurrentTime(10000L);
+ testHarness.setProcessingTime(10000L);
// snapshot but don't call notify to simulate a notify that never
// arrives, the sink should move pending files in restore() in that case
StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
- testHarness = createTestSink(dataDir, clock);
+ testHarness = createTestSink(dataDir);
testHarness.setup();
testHarness.restore(snapshot1);
testHarness.open();
@@ -175,16 +170,15 @@ public class BucketingSinkTest {
final int numElements = 20;
- TestTimeServiceProvider clock = new TestTimeServiceProvider();
- clock.setCurrentTime(0L);
-
BucketingSink<String> sink = new BucketingSink<String>(outPath)
.setBucketer(new BasePathBucketer<String>())
.setPartPrefix("part")
.setPendingPrefix("")
.setPendingSuffix("");
- OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, clock);
+ OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink);
+
+ testHarness.setProcessingTime(0L);
testHarness.setup();
testHarness.open();
@@ -217,9 +211,6 @@ public class BucketingSinkTest {
final int numElements = 20;
- TestTimeServiceProvider clock = new TestTimeServiceProvider();
- clock.setCurrentTime(0L);
-
BucketingSink<Tuple2<IntWritable, Text>> sink = new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
.setWriter(new SequenceFileWriter<IntWritable, Text>())
.setBucketer(new BasePathBucketer<Tuple2<IntWritable, Text>>())
@@ -230,7 +221,9 @@ public class BucketingSinkTest {
sink.setInputType(TypeInformation.of(new TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, Object> testHarness =
- createTestSink(sink, clock);
+ createTestSink(sink);
+
+ testHarness.setProcessingTime(0L);
testHarness.setup();
testHarness.open();
@@ -271,9 +264,6 @@ public class BucketingSinkTest {
final int numElements = 20;
- TestTimeServiceProvider clock = new TestTimeServiceProvider();
- clock.setCurrentTime(0L);
-
Map<String, String> properties = new HashMap<>();
Schema keySchema = Schema.create(Schema.Type.INT);
Schema valueSchema = Schema.create(Schema.Type.STRING);
@@ -290,7 +280,9 @@ public class BucketingSinkTest {
.setPendingSuffix("");
OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness =
- createTestSink(sink, clock);
+ createTestSink(sink);
+
+ testHarness.setProcessingTime(0L);
testHarness.setup();
testHarness.open();
@@ -325,8 +317,8 @@ public class BucketingSinkTest {
/**
* This uses {@link DateTimeBucketer} to
- * produce rolling files. A custom {@link TimeServiceProvider} is set
- * to simulate the advancing of time alongside the processing of elements.
+ * produce rolling files. We use {@link OneInputStreamOperatorTestHarness} to manually
+ * advance processing time.
*/
@Test
public void testDateTimeRollingStringWriter() throws Exception {
@@ -334,16 +326,15 @@ public class BucketingSinkTest {
final String outPath = hdfsURI + "/rolling-out";
- TestTimeServiceProvider clock = new TestTimeServiceProvider();
- clock.setCurrentTime(0L);
-
BucketingSink<String> sink = new BucketingSink<String>(outPath)
.setBucketer(new DateTimeBucketer<String>("ss"))
.setPartPrefix("part")
.setPendingPrefix("")
.setPendingSuffix("");
- OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, clock);
+ OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink);
+
+ testHarness.setProcessingTime(0L);
testHarness.setup();
testHarness.open();
@@ -351,7 +342,7 @@ public class BucketingSinkTest {
for (int i = 0; i < numElements; i++) {
// Every 5 elements, increase the clock time. We should end up with 5 elements per bucket.
if (i % 5 == 0) {
- clock.setCurrentTime(i * 1000L);
+ testHarness.setProcessingTime(i * 1000L);
}
testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i)));
}
@@ -427,10 +418,9 @@ public class BucketingSinkTest {
final int numIds = 4;
final int numElements = 20;
- TestTimeServiceProvider clock = new TestTimeServiceProvider();
- clock.setCurrentTime(0L);
+ OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir);
- OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock);
+ testHarness.setProcessingTime(0L);
testHarness.setup();
testHarness.open();
@@ -465,10 +455,9 @@ public class BucketingSinkTest {
final int step2NumIds = 2;
final int numElementsPerStep = 20;
- TestTimeServiceProvider clock = new TestTimeServiceProvider();
- clock.setCurrentTime(0L);
+ OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir);
- OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock);
+ testHarness.setProcessingTime(0L);
testHarness.setup();
testHarness.open();
@@ -477,13 +466,13 @@ public class BucketingSinkTest {
testHarness.processElement(new StreamRecord<>(Integer.toString(i % step1NumIds)));
}
- clock.setCurrentTime(2*60*1000L);
+ testHarness.setProcessingTime(2*60*1000L);
for (int i = 0; i < numElementsPerStep; i++) {
testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));
}
- clock.setCurrentTime(6*60*1000L);
+ testHarness.setProcessingTime(6*60*1000L);
for (int i = 0; i < numElementsPerStep; i++) {
testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index af99d0d..febfcde 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -43,10 +43,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
final ExecutionConfig config = new ExecutionConfig();
config.setAutoWatermarkInterval(50);
- TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
- new OneInputStreamOperatorTestHarness<Long, Long>(operator, config, processingTimeService);
+ new OneInputStreamOperatorTestHarness<Long, Long>(operator, config);
long currentTime = 0;
@@ -77,7 +75,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
assertTrue(lastWatermark < nextElementValue);
} else {
currentTime = currentTime + 10;
- processingTimeService.setCurrentTime(currentTime);
+ testHarness.setProcessingTime(currentTime);
}
}
@@ -109,7 +107,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
assertTrue(lastWatermark < nextElementValue);
} else {
currentTime = currentTime + 10;
- processingTimeService.setCurrentTime(currentTime);
+ testHarness.setProcessingTime(currentTime);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 128c88b..720258e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -36,15 +36,10 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
@@ -59,7 +54,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -68,7 +62,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
+@SuppressWarnings({"serial"})
public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
@@ -183,45 +177,57 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowTriggerTimeAlignment() throws Exception {
+
try {
- @SuppressWarnings("unchecked")
- final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final ProcessingTimeService timerService = new NoOpTimerService();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ AccumulatingProcessingTimeWindowOperator<String, String, String> op =
+ new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
+ StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
- AccumulatingProcessingTimeWindowOperator<String, String, String> op;
+ KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.open();
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
+ testHarness.close();
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
+
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.open();
+
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
+ testHarness.close();
+
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
+
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.open();
+
assertTrue(op.getNextSlideTime() % 500 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
+ testHarness.close();
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 100 == 0);
- assertTrue(op.getNextEvaluationTime() % 1100 == 0);
- op.dispose();
+
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.open();
+
+ assertEquals(0, op.getNextSlideTime() % 100);
+ assertEquals(0, op.getNextEvaluationTime() % 1100);
+ testHarness.close();
}
catch (Exception e) {
e.printStackTrace();
@@ -231,16 +237,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testTumblingWindow() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
-
try {
final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -249,31 +247,23 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSize);
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- final int numElements = 1000;
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
- for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
+ testHarness.open();
- // get and verify the result
- out.waitForNElements(numElements, 60_000);
+ final int numElements = 1000;
- timerService.quiesceAndAwaitPending();
+ long currentTime = 0;
- synchronized (lock) {
- op.close();
+ for (int i = 0; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
}
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
- List<Integer> result = out.getElements();
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
assertEquals(numElements, result.size());
Collections.sort(result);
@@ -281,102 +271,70 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertEquals(i, result.get(i).intValue());
}
- if (error.get() != null) {
- throw new Exception(error.get());
- }
+ testHarness.close();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
public void testSlidingWindow() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
+ // tumbling window that triggers every 20 milliseconds
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-
- // tumbling window that triggers every 20 milliseconds
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
+ testHarness.open();
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
+ final int numElements = 1000;
- timerService.quiesceAndAwaitPending();
+ long currentTime = 0;
- synchronized (lock) {
- op.close();
- }
+ for (int i = 0; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
+ }
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
+ // get and verify the result
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
- // get and verify the result
- List<Integer> result = out.getElements();
+ // if we kept this running, each element would be in the result three times (for each slide).
+ // we are closing the window before the final panes are through three times, so we may have less
+ // elements.
+ if (result.size() < numElements || result.size() > 3 * numElements) {
+ fail("Wrong number of results: " + result.size());
+ }
- // if we kept this running, each element would be in the result three times (for each slide).
- // we are closing the window before the final panes are through three times, so we may have less
- // elements.
- if (result.size() < numElements || result.size() > 3 * numElements) {
- fail("Wrong number of results: " + result.size());
- }
+ Collections.sort(result);
+ int lastNum = -1;
+ int lastCount = -1;
- Collections.sort(result);
- int lastNum = -1;
- int lastCount = -1;
-
- for (int num : result) {
- if (num == lastNum) {
- lastCount++;
- assertTrue(lastCount <= 3);
- }
- else {
- lastNum = num;
- lastCount = 1;
- }
+ for (int num : result) {
+ if (num == lastNum) {
+ lastCount++;
+ assertTrue(lastCount <= 3);
}
-
- if (error.get() != null) {
- throw new Exception(error.get());
+ else {
+ lastNum = num;
+ lastCount = 1;
}
- } finally {
- timerService.shutdownService();
}
+
+ testHarness.close();
}
@Test
public void testTumblingWindowSingleElements() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -384,66 +342,46 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
validatingIdentityFunction, identitySelector,
IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
- out.waitForNElements(2, 60000);
+ testHarness.open();
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(3));
- op.processElement(new StreamRecord<Integer>(4));
- op.processElement(new StreamRecord<Integer>(5));
- }
- out.waitForNElements(5, 60000);
+ testHarness.setProcessingTime(0);
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(6));
- }
- out.waitForNElements(6, 60000);
-
- List<Integer> result = out.getElements();
- assertEquals(6, result.size());
+ testHarness.processElement(new StreamRecord<>(1));
+ testHarness.processElement(new StreamRecord<>(2));
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+ testHarness.setProcessingTime(50);
- timerService.quiesceAndAwaitPending();
+ testHarness.processElement(new StreamRecord<>(3));
+ testHarness.processElement(new StreamRecord<>(4));
+ testHarness.processElement(new StreamRecord<>(5));
- synchronized (lock) {
- op.close();
- }
+ testHarness.setProcessingTime(100);
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
+ testHarness.processElement(new StreamRecord<>(6));
- if (error.get() != null) {
- throw new Exception(error.get());
- }
+ testHarness.setProcessingTime(200);
+
+
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+ assertEquals(6, result.size());
+
+ Collections.sort(result);
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+ testHarness.close();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
public void testSlidingWindowSingleElements() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
-
try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -451,44 +389,33 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
validatingIdentityFunction, identitySelector,
IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
+ testHarness.setProcessingTime(0);
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(1));
+ testHarness.processElement(new StreamRecord<>(2));
+
+ testHarness.setProcessingTime(50);
+ testHarness.setProcessingTime(100);
+ testHarness.setProcessingTime(150);
+
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
- // each element should end up in the output three times
- // wait until the elements have arrived 6 times in the output
- out.waitForNElements(6, 120000);
-
- List<Integer> result = out.getElements();
assertEquals(6, result.size());
Collections.sort(result);
assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
- timerService.quiesceAndAwaitPending();
-
- synchronized (lock) {
- op.close();
- }
-
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
-
- if (error.get() != null) {
- throw new Exception(error.get());
- }
+ testHarness.close();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
@@ -503,15 +430,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSize);
- TestProcessingTimeService timerService = new TestProcessingTimeService();
-
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
- new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
testHarness.setup();
testHarness.open();
- timerService.setCurrentTime(0);
+ testHarness.setProcessingTime(0);
// inject some elements
final int numElementsFirst = 700;
@@ -542,8 +467,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSize);
- timerService = new TestProcessingTimeService();
- testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
testHarness.setup();
testHarness.restore(state);
@@ -554,7 +478,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(i));
}
- timerService.setCurrentTime(400);
+ testHarness.setProcessingTime(400);
// get and verify the result
List<Integer> finalResult = new ArrayList<>();
@@ -568,7 +492,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertEquals(i, finalResult.get(i).intValue());
}
testHarness.close();
- op.dispose();
}
catch (Exception e) {
e.printStackTrace();
@@ -583,8 +506,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final int windowSlide = 50;
final int windowSize = factor * windowSlide;
- TestProcessingTimeService timerService = new TestProcessingTimeService();
-
// sliding window (200 msecs) every 50 msecs
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
new AccumulatingProcessingTimeWindowOperator<>(
@@ -593,9 +514,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
windowSize, windowSlide);
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
- new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
- timerService.setCurrentTime(0);
+ testHarness.setProcessingTime(0);
testHarness.setup();
testHarness.open();
@@ -623,7 +544,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
testHarness.close();
- op.dispose();
// re-create the operator and restore the state
op = new AccumulatingProcessingTimeWindowOperator<>(
@@ -631,8 +551,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSlide);
- timerService = new TestProcessingTimeService();
- testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
testHarness.setup();
testHarness.restore(state);
@@ -644,13 +563,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(i));
}
- timerService.setCurrentTime(50);
- timerService.setCurrentTime(100);
- timerService.setCurrentTime(150);
- timerService.setCurrentTime(200);
- timerService.setCurrentTime(250);
- timerService.setCurrentTime(300);
- timerService.setCurrentTime(350);
+ testHarness.setProcessingTime(50);
+ testHarness.setProcessingTime(100);
+ testHarness.setProcessingTime(150);
+ testHarness.setProcessingTime(200);
+ testHarness.setProcessingTime(250);
+ testHarness.setProcessingTime(300);
+ testHarness.setProcessingTime(350);
// get and verify the result
List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
@@ -684,14 +603,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
new StatefulFunction(), identitySelector,
IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
- TestProcessingTimeService timerService = new TestProcessingTimeService();
-
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), identitySelector, BasicTypeInfo.INT_TYPE_INFO);
testHarness.open();
- timerService.setCurrentTime(0);
+ testHarness.setProcessingTime(0);
testHarness.processElement(new StreamRecord<>(1));
testHarness.processElement(new StreamRecord<>(2));
@@ -703,7 +620,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
op.processElement(new StreamRecord<>(2));
op.processElement(new StreamRecord<>(2));
- timerService.setCurrentTime(1000);
+ testHarness.setProcessingTime(1000);
List<Integer> result = extractFromStreamRecords(testHarness.getOutput());
assertEquals(8, result.size());
@@ -808,7 +725,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@SuppressWarnings({"unchecked", "rawtypes"})
- private <T> List<T> extractFromStreamRecords(Iterable<Object> input) {
+ private <T> List<T> extractFromStreamRecords(Iterable<?> input) {
List<T> result = new ArrayList<>();
for (Object in : input) {
if (in instanceof StreamRecord) {
@@ -824,5 +741,5 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
while (!timers.isTerminated()) {
Thread.sleep(2);
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index bb64a08..7ca5753 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.ValueState;
@@ -32,24 +31,13 @@ import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
import org.junit.After;
import org.junit.Test;
@@ -57,21 +45,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
+@SuppressWarnings("serial")
public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
@@ -79,23 +64,23 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-
- private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector =
+
+ private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector =
new KeySelector<Tuple2<Integer,Integer>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer,Integer> value) {
return value.f0;
}
};
-
+
private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
};
-
- private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer =
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer =
new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)
.createSerializer(new ExecutionConfig());
@@ -107,14 +92,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
return diff0 != 0 ? diff0 : diff1;
}
};
-
+
// ------------------------------------------------------------------------
public AggregatingAlignedProcessingTimeWindowOperatorTest() {
ClosureCleaner.clean(fieldOneSelector, false);
ClosureCleaner.clean(sumFunction, false);
}
-
+
// ------------------------------------------------------------------------
@After
@@ -131,9 +116,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue("Not all trigger threads where properly shut down",
StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
}
-
+
// ------------------------------------------------------------------------
-
+
@Test
public void testInvalidParameters() {
try {
@@ -141,7 +126,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertInvalidParameter(10000L, -1L);
assertInvalidParameter(-1L, 1000L);
assertInvalidParameter(1000L, 2000L);
-
+
// actual internal slide is too low here:
assertInvalidParameter(1000L, 999L);
}
@@ -150,12 +135,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testWindowSizeAndSlide() {
try {
AggregatingProcessingTimeWindowOperator<String, String> op;
-
+
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
assertEquals(5000, op.getWindowSize());
@@ -193,44 +178,51 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowTriggerTimeAlignment() throws Exception {
try {
- @SuppressWarnings("unchecked")
- final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final ProcessingTimeService timerService = new NoOpTimerService();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
- AggregatingProcessingTimeWindowOperator<String, String> op;
+ AggregatingProcessingTimeWindowOperator<String, String> op =
+ new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
+ StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
+
+ KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+ testHarness.open();
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
- op.setup(mockTask, createTaskConfig(mockKeySelector, StringSerializer.INSTANCE, 10), mockOut);
- op.open();
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
+ testHarness.close();
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
+
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+ testHarness.open();
+
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
+ testHarness.close();
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
+
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+ testHarness.open();
+
assertTrue(op.getNextSlideTime() % 500 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
+ testHarness.close();
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
+
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
+ testHarness.open();
+
assertTrue(op.getNextSlideTime() % 100 == 0);
assertTrue(op.getNextEvaluationTime() % 1100 == 0);
- op.dispose();
+ testHarness.close();
}
catch (Exception e) {
e.printStackTrace();
@@ -240,85 +232,54 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testTumblingWindowUniqueElements() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
try {
final int windowSize = 50;
- final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
-
+
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
sumFunction, fieldOneSelector,
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
- op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
- op.open();
+ testHarness.open();
final int numElements = 1000;
+ long currentTime = 0;
+
for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
- op.setKeyContextElement1(next);
- op.processElement(next);
- }
- Thread.sleep(1);
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ testHarness.processElement(next);
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
}
- out.waitForNElements(numElements, 60_000);
-
// get and verify the result
- List<Tuple2<Integer, Integer>> result = out.getElements();
+ List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
assertEquals(numElements, result.size());
- timerService.quiesceAndAwaitPending();
-
- synchronized (lock) {
- op.close();
- }
-
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
-
+ testHarness.close();
Collections.sort(result, tupleComparator);
for (int i = 0; i < numElements; i++) {
assertEquals(i, result.get(i).f0.intValue());
assertEquals(i, result.get(i).f1.intValue());
}
-
- if (error.get() != null) {
- throw new Exception(error.get());
- }
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
public void testTumblingWindowDuplicateElements() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
try {
final int windowSize = 50;
- final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
-
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
@@ -326,43 +287,39 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
- op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
- op.open();
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setProcessingTime(0);
+ testHarness.open();
final int numWindows = 10;
long previousNextTime = 0;
int window = 1;
-
- while (window <= numWindows) {
- synchronized (lock) {
- long nextTime = op.getNextEvaluationTime();
- int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-
- StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(val, val));
- op.setKeyContextElement1(next);
- op.processElement(next);
-
- if (nextTime != previousNextTime) {
- window++;
- previousNextTime = nextTime;
- }
- }
- Thread.sleep(1);
- }
- out.waitForNElements(numWindows, 60_000);
+ long currentTime = 0;
- List<Tuple2<Integer, Integer>> result = out.getElements();
+ while (window <= numWindows) {
+ long nextTime = op.getNextEvaluationTime();
+ int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
- timerService.quiesceAndAwaitPending();
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(val, val));
+ testHarness.processElement(next);
- synchronized (lock) {
- op.close();
+ if (nextTime != previousNextTime) {
+ window++;
+ previousNextTime = nextTime;
+ }
+ currentTime = currentTime + 1;
+ testHarness.setProcessingTime(currentTime);
}
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
+ testHarness.setProcessingTime(currentTime + 100);
+
+ List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+ testHarness.close();
// we have ideally one element per window. we may have more, when we emitted a value into the
// successive window (corner case), so we can have twice the number of elements, in the worst case.
@@ -371,33 +328,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// deduplicate for more accurate checks
HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result);
assertTrue(set.size() == 10);
-
- if (error.get() != null) {
- throw new Exception(error.get());
- }
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
public void testSlidingWindow() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
-
try {
- final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
-
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
@@ -405,32 +345,27 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
150, 50);
- op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
- op.open();
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
final int numElements = 1000;
+ long currentTime = 0;
+
for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
- op.setKeyContextElement1(next);
- op.processElement(next);
- }
- Thread.sleep(1);
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ testHarness.processElement(next);
+ currentTime = currentTime + 1;
+ testHarness.setProcessingTime(currentTime);
}
- timerService.quiesceAndAwaitPending();
-
- synchronized (lock) {
- op.close();
- }
+ // get and verify the result
+ List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
+ testHarness.close();
- // get and verify the result
- List<Tuple2<Integer, Integer>> result = out.getElements();
-
// every element can occur between one and three times
if (result.size() < numElements || result.size() > 3 * numElements) {
System.out.println(result);
@@ -440,10 +375,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
Collections.sort(result, tupleComparator);
int lastNum = -1;
int lastCount = -1;
-
+
for (Tuple2<Integer, Integer> val : result) {
assertEquals(val.f0, val.f1);
-
+
if (val.f0 == lastNum) {
lastCount++;
assertTrue(lastCount <= 3);
@@ -453,58 +388,42 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
lastCount = 1;
}
}
-
- if (error.get() != null) {
- throw new Exception(error.get());
- }
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
public void testSlidingWindowSingleElements() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
-
try {
- final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
sumFunction, fieldOneSelector,
IntSerializer.INSTANCE, tupleSerializer, 150, 50);
- op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
- op.open();
-
- synchronized (lock) {
- StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1));
- op.setKeyContextElement1(next1);
- op.processElement(next1);
-
- StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2));
- op.setKeyContextElement1(next2);
- op.processElement(next2);
- }
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ testHarness.setProcessingTime(0);
+
+ StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1));
+ testHarness.processElement(next1);
- // each element should end up in the output three times
- // wait until the elements have arrived 6 times in the output
- out.waitForNElements(6, 120000);
-
- List<Tuple2<Integer, Integer>> result = out.getElements();
+ StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2));
+ testHarness.processElement(next2);
+
+ testHarness.setProcessingTime(50);
+ testHarness.setProcessingTime(100);
+ testHarness.setProcessingTime(150);
+
+ List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
assertEquals(6, result.size());
-
+
Collections.sort(result, tupleComparator);
assertEquals(Arrays.asList(
new Tuple2<>(1, 1),
@@ -515,40 +434,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
new Tuple2<>(2, 2)
), result);
- timerService.quiesceAndAwaitPending();
-
- synchronized (lock) {
- op.close();
- }
-
- shutdownTimerServiceAndWait(timerService);
- op.dispose();
-
- if (error.get() != null) {
- throw new Exception(error.get());
- }
+ testHarness.close();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
public void testPropagateExceptionsFromProcessElement() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final ProcessingTimeService timerService = new SystemProcessingTimeService(
- new ReferenceSettingExceptionHandler(error), lock);
try {
- final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
-
ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
// the operator has a window time that is so long that it will not fire in this test
@@ -559,46 +456,31 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
hundredYears, hundredYears);
- op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
- op.open();
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
for (int i = 0; i < 100; i++) {
- synchronized (lock) {
- StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
- op.setKeyContextElement1(next);
- op.processElement(next);
- }
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
+ testHarness.processElement(next);
}
-
+
try {
StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
- op.setKeyContextElement1(next);
- op.processElement(next);
+ testHarness.processElement(next);
fail("This fail with an exception");
}
catch (Exception e) {
assertTrue(e.getMessage().contains("Artificial Test Exception"));
}
- timerService.quiesceAndAwaitPending();
- synchronized (lock) {
- op.close();
- }
-
- shutdownTimerServiceAndWait(timerService);
op.dispose();
-
- if (error.get() != null) {
- throw new Exception(error.get());
- }
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
@@ -606,8 +488,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final int windowSize = 200;
- TestProcessingTimeService timerService = new TestProcessingTimeService();
-
// tumbling window that triggers every 50 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
@@ -616,9 +496,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
windowSize, windowSize);
OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
- timerService.setCurrentTime(0);
+ testHarness.setProcessingTime(0);
testHarness.setup();
testHarness.open();
@@ -626,7 +506,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// inject some elements
final int numElementsFirst = 700;
final int numElements = 1000;
-
+
for (int i = 0; i < numElementsFirst; i++) {
StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
testHarness.processElement(next);
@@ -656,8 +536,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
- timerService = new TestProcessingTimeService();
- testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
testHarness.setup();
testHarness.restore(state);
@@ -669,7 +548,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
testHarness.processElement(next);
}
- timerService.setCurrentTime(200);
+ testHarness.setProcessingTime(200);
// get and verify the result
List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
@@ -699,8 +578,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final int windowSlide = 50;
final int windowSize = factor * windowSlide;
- TestProcessingTimeService timerService = new TestProcessingTimeService();
-
// sliding window (200 msecs) every 50 msecs
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
@@ -708,10 +585,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSlide);
- timerService.setCurrentTime(0);
OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+
+ testHarness.setProcessingTime(0);
testHarness.setup();
testHarness.open();
@@ -749,8 +627,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSlide);
- timerService = new TestProcessingTimeService();
- testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+ testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
testHarness.setup();
testHarness.restore(state);
@@ -762,14 +639,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
testHarness.processElement(next);
}
- timerService.setCurrentTime(50);
- timerService.setCurrentTime(100);
- timerService.setCurrentTime(150);
- timerService.setCurrentTime(200);
- timerService.setCurrentTime(250);
- timerService.setCurrentTime(300);
- timerService.setCurrentTime(350);
- timerService.setCurrentTime(400);
+ testHarness.setProcessingTime(50);
+ testHarness.setProcessingTime(100);
+ testHarness.setProcessingTime(150);
+ testHarness.setProcessingTime(200);
+ testHarness.setProcessingTime(250);
+ testHarness.setProcessingTime(300);
+ testHarness.setProcessingTime(350);
+ testHarness.setProcessingTime(400);
// get and verify the result
List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
@@ -796,11 +673,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
public void testKeyValueStateInWindowFunctionTumbling() {
try {
final long twoSeconds = 2000;
-
- TestProcessingTimeService timerService = new TestProcessingTimeService();
StatefulFunction.globalCounts.clear();
-
+
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
new StatefulFunction(), fieldOneSelector,
@@ -809,16 +684,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
op,
new ExecutionConfig(),
- timerService,
fieldOneSelector,
BasicTypeInfo.INT_TYPE_INFO);
- timerService.setCurrentTime(0);
+ testHarness.setProcessingTime(0);
testHarness.open();
// because the window interval is so large, everything should be in one window
// and aggregate into one value per key
-
+
for (int i = 0; i < 10; i++) {
StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
testHarness.processElement(next1);
@@ -827,7 +701,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
testHarness.processElement(next2);
}
- timerService.setCurrentTime(1000);
+ testHarness.setProcessingTime(1000);
int count1 = StatefulFunction.globalCounts.get(1);
int count2 = StatefulFunction.globalCounts.get(2);
@@ -851,32 +725,30 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final int windowSlide = 50;
final int windowSize = factor * windowSlide;
- TestProcessingTimeService timerService = new TestProcessingTimeService();
-
StatefulFunction.globalCounts.clear();
-
+
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
new StatefulFunction(), fieldOneSelector,
IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide);
- timerService.setCurrentTime(0);
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
op,
new ExecutionConfig(),
- timerService,
fieldOneSelector,
BasicTypeInfo.INT_TYPE_INFO);
+ testHarness.setProcessingTime(0);
+
testHarness.open();
// because the window interval is so large, everything should be in one window
// and aggregate into one value per key
final int numElements = 100;
-
+
// because we do not release the lock here, these elements
for (int i = 0; i < numElements; i++) {
-
+
StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
StreamRecord<Tuple2<Integer, Integer>> next3 = new StreamRecord<>(new Tuple2<>(1, i));
@@ -888,14 +760,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
testHarness.processElement(next4);
}
- timerService.setCurrentTime(50);
- timerService.setCurrentTime(100);
- timerService.setCurrentTime(150);
- timerService.setCurrentTime(200);
+ testHarness.setProcessingTime(50);
+ testHarness.setProcessingTime(100);
+ testHarness.setProcessingTime(150);
+ testHarness.setProcessingTime(200);
int count1 = StatefulFunction.globalCounts.get(1);
int count2 = StatefulFunction.globalCounts.get(2);
-
+
assertTrue(count1 >= 2 && count1 <= 2 * numElements);
assertEquals(count1, count2);
@@ -907,12 +779,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
fail(e.getMessage());
}
}
-
+
// ------------------------------------------------------------------------
-
+
private void assertInvalidParameter(long windowSize, long windowSlide) {
try {
- new AggregatingProcessingTimeWindowOperator<String, String>(
+ new AggregatingProcessingTimeWindowOperator<>(
mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE,
windowSize, windowSlide);
@@ -927,11 +799,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
}
// ------------------------------------------------------------------------
-
+
private static class FailingFunction implements ReduceFunction<Tuple2<Integer, Integer>> {
private final int failAfterElements;
-
+
private int numElements;
FailingFunction(int failAfterElements) {
@@ -945,7 +817,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
if (numElements >= failAfterElements) {
throw new Exception("Artificial Test Exception");
}
-
+
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
}
@@ -961,7 +833,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Override
public void open(Configuration parameters) {
assertNotNull(getRuntimeContext());
-
+
// start with one, so the final count is correct and we test that we do not
// initialize with 0 always by default
state = getRuntimeContext().getState(new ValueStateDescriptor<>("totalCount", Integer.class, 1));
@@ -971,44 +843,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
state.update(state.value() + 1);
globalCounts.put(value1.f0, state.value());
-
+
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
}
// ------------------------------------------------------------------------
-
- private static StreamTask<?, ?> createMockTask() {
- Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
-
- StreamTask<?, ?> task = mock(StreamTask.class);
- when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
- when(task.getName()).thenReturn("Test task name");
- when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-
- final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
- when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
-
- final Environment env = new DummyEnvironment("Test task name", 1, 0);
- when(task.getEnvironment()).thenReturn(env);
-
- return task;
- }
-
- private static StreamTask<?, ?> createMockTaskWithTimer(final ProcessingTimeService timerService)
- {
- StreamTask<?, ?> mockTask = createMockTask();
- when(mockTask.getProcessingTimeService()).thenReturn(timerService);
- return mockTask;
- }
-
- private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) {
- return new StreamConfig(new Configuration());
- }
@SuppressWarnings({"unchecked", "rawtypes"})
- private <T> List<T> extractFromStreamRecords(Iterable<Object> input) {
+ private <T> List<T> extractFromStreamRecords(Iterable<?> input) {
List<T> result = new ArrayList<>();
for (Object in : input) {
if (in instanceof StreamRecord) {
@@ -1017,12 +860,4 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
}
return result;
}
-
- private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception {
- timers.shutdownService();
-
- while (!timers.isTerminated()) {
- Thread.sleep(2);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
deleted file mode 100644
index 42be131..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CollectingOutput<T> implements Output<StreamRecord<T>> {
-
- private final List<T> elements = new ArrayList<>();
-
- private final int timeStampModulus;
-
-
- public CollectingOutput() {
- this.timeStampModulus = 0;
- }
-
- public CollectingOutput(int timeStampModulus) {
- this.timeStampModulus = timeStampModulus;
- }
-
- // ------------------------------------------------------------------------
-
- public List<T> getElements() {
- return elements;
- }
-
- public void waitForNElements(int n, long timeout) throws InterruptedException {
- long deadline = System.currentTimeMillis() + timeout;
- synchronized (elements) {
- long now;
- while (elements.size() < n && (now = System.currentTimeMillis()) < deadline) {
- elements.wait(deadline - now);
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void emitWatermark(Watermark mark) {
- throw new UnsupportedOperationException("The output should not emit watermarks");
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- throw new UnsupportedOperationException("The output should not emit latency markers");
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- elements.add(record.getValue());
-
- if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) {
- throw new IllegalArgumentException("Invalid timestamp");
- }
- synchronized (elements) {
- elements.notifyAll();
- }
- }
-
- @Override
- public void close() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
deleted file mode 100644
index a7a71cf..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-
-import java.util.concurrent.ScheduledFuture;
-
-class NoOpTimerService extends ProcessingTimeService {
-
- private volatile boolean terminated;
-
- @Override
- public long getCurrentProcessingTime() {
- return System.currentTimeMillis();
- }
-
- @Override
- public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
- return null;
- }
-
- @Override
- public boolean isTerminated() {
- return terminated;
- }
-
- @Override
- public void quiesceAndAwaitPending() {}
-
- @Override
- public void shutdownService() {
- terminated = true;
- }
-}