You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:50 UTC

[11/11] flink git commit: [FLINK-4460] Add WindowOperatorContractTest tests for late data emission

[FLINK-4460] Add WindowOperatorContractTest tests for late data emission


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2afe1b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2afe1b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2afe1b7

Branch: refs/heads/master
Commit: c2afe1b7ba748ccdb1b4233321805f31a6465bc1
Parents: f0a58f7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 11:42:56 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 90 +++++++++++++++++++-
 .../operators/windowing/WindowOperatorTest.java | 22 ++---
 .../util/AbstractStreamOperatorTestHarness.java |  5 +-
 3 files changed, 103 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2afe1b7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index f70990f..c542b43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -298,6 +299,75 @@ public class WindowOperatorContractTest extends TestLogger {
 		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
 	}
 
+	/**
+	 * Verify that there is no late-date side output if the {@code WindowAssigner} does
+	 * not assign any windows.
+	 */
+	@Test
+	public void testNoLateSideOutputForSkippedWindows() throws Exception {
+
+		OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late"){};
+
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+
+		testHarness.open();
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Collections.<TimeWindow>emptyList());
+
+		testHarness.processWatermark(0);
+		testHarness.processElement(new StreamRecord<>(0, 5L));
+
+		verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
+
+		assertTrue(testHarness.getSideOutput(lateOutputTag) == null || testHarness.getSideOutput(lateOutputTag).isEmpty());
+	}
+
+	@Test
+	public void testLateSideOutput() throws Exception {
+
+		OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late"){};
+
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+
+		testHarness.open();
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Collections.singletonList(new TimeWindow(0, 0)));
+
+		testHarness.processWatermark(20);
+		testHarness.processElement(new StreamRecord<>(0, 5L));
+
+		verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
+
+		assertThat(testHarness.getSideOutput(lateOutputTag),
+				containsInAnyOrder(isStreamRecord(0, 5L)));
+
+		// we should also see side output if the WindowAssigner assigns no windows
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Collections.<TimeWindow>emptyList());
+
+		testHarness.processElement(new StreamRecord<>(0, 10L));
+
+		verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
+		verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L), anyAssignerContext());
+
+		assertThat(testHarness.getSideOutput(lateOutputTag),
+				containsInAnyOrder(isStreamRecord(0, 5L), isStreamRecord(0, 10L)));
+
+	}
+
+
 	@Test
 	public void testAssignerIsInvokedOncePerElement() throws Exception {
 
@@ -2302,7 +2372,8 @@ public class WindowOperatorContractTest extends TestLogger {
 			Trigger<Integer, W> trigger,
 			long allowedLatenss,
 			StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
-			InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
+			InternalWindowFunction<ACC, OUT, Integer, W> windowFunction,
+			OutputTag<Integer> lateOutputTag) throws Exception {
 
 		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
 			private static final long serialVersionUID = 1L;
@@ -2323,7 +2394,7 @@ public class WindowOperatorContractTest extends TestLogger {
 				windowFunction,
 				trigger,
 				allowedLatenss,
-				null /*late data output tag */);
+				lateOutputTag);
 
 		return new KeyedOneInputStreamOperatorTestHarness<>(
 				operator,
@@ -2331,6 +2402,21 @@ public class WindowOperatorContractTest extends TestLogger {
 				BasicTypeInfo.INT_TYPE_INFO);
 	}
 
+	private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+			WindowAssigner<Integer, W> assigner,
+			Trigger<Integer, W> trigger,
+			long allowedLatenss,
+			StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
+			InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
+		return createWindowOperator(
+				assigner,
+				trigger,
+				allowedLatenss,
+				stateDescriptor,
+				windowFunction,
+				null /* late output tag */);
+	}
+
 
 	private static class Sum implements ReduceFunction<Integer> {
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c2afe1b7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index b38cb2e..8f8667b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1333,7 +1333,7 @@ public class WindowOperatorTest extends TestLogger {
 		TestHarnessUtil.assertOutputEqualsSorted(
 				"SideOutput was not correct.",
 				lateExpected,
-				testHarness.getSideOutput(lateOutputTag),
+				(Iterable) testHarness.getSideOutput(lateOutputTag),
 				new Tuple2ResultSortComparator());
 
 		testHarness.close();
@@ -1467,7 +1467,7 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(3999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
-		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) testHarness.getSideOutput(
 				lateOutputTag), new Tuple2ResultSortComparator());
 		testHarness.close();
 	}
@@ -1548,7 +1548,7 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(25000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
-		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
 		testHarness.close();
 	}
 
@@ -1636,10 +1636,10 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+		ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
-		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) sideActual, new Tuple2ResultSortComparator());
 
 		testHarness.close();
 	}
@@ -1723,9 +1723,10 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+		ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(
+				lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
-		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, (Iterable) sideActual, new Tuple2ResultSortComparator());
 		testHarness.close();
 	}
 
@@ -1897,7 +1898,8 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-2", 10000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+		ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(
+				lateOutputTag);
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 		assertEquals(null, sideActual);
@@ -1989,7 +1991,7 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+		ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 		assertEquals(null, sideActual);
 
@@ -2082,7 +2084,7 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-7", 1000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-		ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+		ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 		assertEquals(null, sideActual);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2afe1b7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 74012b0..945103c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -270,8 +270,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		return outputList;
 	}
 
-	public ConcurrentLinkedQueue<Object> getSideOutput(OutputTag tag) {
-		return sideOutputLists.get(tag);
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public <X> ConcurrentLinkedQueue<StreamRecord<X>> getSideOutput(OutputTag<X> tag) {
+		return (ConcurrentLinkedQueue) sideOutputLists.get(tag);
 	}
 
 	/**