You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:50 UTC
[11/11] flink git commit: [FLINK-4460] Add WindowOperatorContractTest
tests for late data emission
[FLINK-4460] Add WindowOperatorContractTest tests for late data emission
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2afe1b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2afe1b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2afe1b7
Branch: refs/heads/master
Commit: c2afe1b7ba748ccdb1b4233321805f31a6465bc1
Parents: f0a58f7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 11:42:56 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100
----------------------------------------------------------------------
.../windowing/WindowOperatorContractTest.java | 90 +++++++++++++++++++-
.../operators/windowing/WindowOperatorTest.java | 22 ++---
.../util/AbstractStreamOperatorTestHarness.java | 5 +-
3 files changed, 103 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c2afe1b7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index f70990f..c542b43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Matchers;
@@ -298,6 +299,75 @@ public class WindowOperatorContractTest extends TestLogger {
when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
}
+ /**
+ * Verify that there is no late-date side output if the {@code WindowAssigner} does
+ * not assign any windows.
+ */
+ @Test
+ public void testNoLateSideOutputForSkippedWindows() throws Exception {
+
+ OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late"){};
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.<TimeWindow>emptyList());
+
+ testHarness.processWatermark(0);
+ testHarness.processElement(new StreamRecord<>(0, 5L));
+
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
+
+ assertTrue(testHarness.getSideOutput(lateOutputTag) == null || testHarness.getSideOutput(lateOutputTag).isEmpty());
+ }
+
+ @Test
+ public void testLateSideOutput() throws Exception {
+
+ OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late"){};
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 0)));
+
+ testHarness.processWatermark(20);
+ testHarness.processElement(new StreamRecord<>(0, 5L));
+
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
+
+ assertThat(testHarness.getSideOutput(lateOutputTag),
+ containsInAnyOrder(isStreamRecord(0, 5L)));
+
+ // we should also see side output if the WindowAssigner assigns no windows
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.<TimeWindow>emptyList());
+
+ testHarness.processElement(new StreamRecord<>(0, 10L));
+
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L), anyAssignerContext());
+
+ assertThat(testHarness.getSideOutput(lateOutputTag),
+ containsInAnyOrder(isStreamRecord(0, 5L), isStreamRecord(0, 10L)));
+
+ }
+
+
@Test
public void testAssignerIsInvokedOncePerElement() throws Exception {
@@ -2302,7 +2372,8 @@ public class WindowOperatorContractTest extends TestLogger {
Trigger<Integer, W> trigger,
long allowedLatenss,
StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
- InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
+ InternalWindowFunction<ACC, OUT, Integer, W> windowFunction,
+ OutputTag<Integer> lateOutputTag) throws Exception {
KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@@ -2323,7 +2394,7 @@ public class WindowOperatorContractTest extends TestLogger {
windowFunction,
trigger,
allowedLatenss,
- null /*late data output tag */);
+ lateOutputTag);
return new KeyedOneInputStreamOperatorTestHarness<>(
operator,
@@ -2331,6 +2402,21 @@ public class WindowOperatorContractTest extends TestLogger {
BasicTypeInfo.INT_TYPE_INFO);
}
+ private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ WindowAssigner<Integer, W> assigner,
+ Trigger<Integer, W> trigger,
+ long allowedLatenss,
+ StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
+ InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
+ return createWindowOperator(
+ assigner,
+ trigger,
+ allowedLatenss,
+ stateDescriptor,
+ windowFunction,
+ null /* late output tag */);
+ }
+
private static class Sum implements ReduceFunction<Integer> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/c2afe1b7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index b38cb2e..8f8667b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1333,7 +1333,7 @@ public class WindowOperatorTest extends TestLogger {
TestHarnessUtil.assertOutputEqualsSorted(
"SideOutput was not correct.",
lateExpected,
- testHarness.getSideOutput(lateOutputTag),
+ (Iterable) testHarness.getSideOutput(lateOutputTag),
new Tuple2ResultSortComparator());
testHarness.close();
@@ -1467,7 +1467,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
- TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) testHarness.getSideOutput(
lateOutputTag), new Tuple2ResultSortComparator());
testHarness.close();
}
@@ -1548,7 +1548,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(25000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
- TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
testHarness.close();
}
@@ -1636,10 +1636,10 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
- ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+ ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
- TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) sideActual, new Tuple2ResultSortComparator());
testHarness.close();
}
@@ -1723,9 +1723,10 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
- ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+ ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(
+ lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
- TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) sideActual, new Tuple2ResultSortComparator());
testHarness.close();
}
@@ -1897,7 +1898,8 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-2", 10000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
- ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+ ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(
+ lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
@@ -1989,7 +1991,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
- ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+ ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
@@ -2082,7 +2084,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-7", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
- ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+ ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
http://git-wip-us.apache.org/repos/asf/flink/blob/c2afe1b7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 74012b0..945103c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -270,8 +270,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
return outputList;
}
- public ConcurrentLinkedQueue<Object> getSideOutput(OutputTag tag) {
- return sideOutputLists.get(tag);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public <X> ConcurrentLinkedQueue<StreamRecord<X>> getSideOutput(OutputTag<X> tag) {
+ return (ConcurrentLinkedQueue) sideOutputLists.get(tag);
}
/**