You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/24 05:51:19 UTC

[2/3] flink git commit: [hotfix] Fix various small issues in WindowOperatorContractTest

[hotfix] Fix various small issues in WindowOperatorContractTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/785ae632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/785ae632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/785ae632

Branch: refs/heads/release-1.2
Commit: 785ae63292c6df889cd4da42d27ffcbe5862546d
Parents: e27b697
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 22 17:02:15 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 24 12:25:03 2017 +0800

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 91 +++++++++-----------
 1 file changed, 41 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/785ae632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 1d51b45..a206455 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -36,7 +37,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -154,7 +155,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 	@SuppressWarnings("unchecked")
 	static Iterable<Integer> intIterable(Integer... values) {
-		return (Iterable<Integer>) argThat(containsInAnyOrder(values));
+		return (Iterable<Integer>) argThat(contains(values));
 	}
 
 	static TimeWindow anyTimeWindow() {
@@ -246,51 +247,51 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	}
 
 	private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
 	}
 
 	private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 	}
 
 	private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
 	}
 
 	private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
 	}
 
 	private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
 	}
 
 	private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 	}
 
 	private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
 	}
 
 	private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
 	}
 
 	private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
 	}
 
 	private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 	}
 
 	private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
 	}
 
 	private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
 	}
 
 	@Test
@@ -450,7 +451,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
-				containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
 	}
 
 	@Test
@@ -464,7 +465,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	}
 
 
-	private  void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -503,7 +504,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
-				containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
 	}
 
 	@Test
@@ -997,9 +998,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 	/**
 	 * Verify that we neither invoke the trigger nor the window function if a timer
-	 * for a non-existent merging window fires.
+	 * for an empty merging window fires.
 	 */
-	public void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1063,7 +1064,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we neither invoke the trigger nor the window function if a timer
 	 * fires for a merging window that was already garbage collected.
 	 */
-	public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1096,7 +1097,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 			}
 		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
 
-
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
 		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
@@ -1241,7 +1241,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor());
 	}
 
-	public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1302,8 +1302,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4))), anyMergeCallback());
-		verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback());
+		verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
+		verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
+
 		verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback());
 
 
@@ -1322,7 +1323,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	/**
 	 * Verify that windows are merged eagerly, if possible.
 	 */
-	public void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		// in this test we only have one state window and windows are eagerly
 		// merged into the first window
 
@@ -1386,8 +1387,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		shouldMergeWindows(
 				mockAssigner,
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
 				new TimeWindow(0, 4));
 
 		// don't register a timer or update state in onElement, this checks
@@ -1421,7 +1422,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we only keep one of the underlying state windows. This test also verifies that
 	 * GC timers are correctly deleted when merging windows.
 	 */
-	public void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1492,8 +1493,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		shouldMergeWindows(
 				mockAssigner,
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
 				new TimeWindow(0, 4));
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
@@ -1548,7 +1549,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor());
 	}
 
-	public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1593,7 +1594,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor());
 	}
 
-	public void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1697,7 +1698,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testGarbageCollectionTimer(new ProcessingTimeAdaptor());
 	}
 
-	public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1742,7 +1743,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor());
 	}
 
-	public void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1798,7 +1799,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor());
 	}
 
-	public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1868,7 +1869,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we correctly clean up even when a purging trigger has purged
 	 * window state.
 	 */
-	public void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1939,7 +1940,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we correctly clean up even when a purging trigger has purged
 	 * window state.
 	 */
-	public void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2005,7 +2006,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testMergingWindowSetClearedAtGarbageCollection(new ProcessingTimeAdaptor());
 	}
 
-	public void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2050,12 +2051,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		assertEquals(0, testHarness.getOutput().size());
 		assertEquals(0, testHarness.numKeyedStateEntries());
 
-		doAnswer(new Answer<TriggerResult>() {
-			@Override
-			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
-				return TriggerResult.FIRE;
-			}
-		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+		shouldFireOnElement(mockTrigger);
 
 		// 20 is just at the limit, window.maxTime() is 1 and allowed lateness is 20
 		testHarness.processWatermark(new Watermark(20));
@@ -2089,12 +2085,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		assertEquals(0, testHarness.getOutput().size());
 		assertEquals(0, testHarness.numKeyedStateEntries());
 
-		doAnswer(new Answer<TriggerResult>() {
-			@Override
-			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
-				return TriggerResult.FIRE;
-			}
-		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+		shouldFireOnElement(mockTrigger);
 
 		// window.maxTime() == 1 plus 20L of allowed lateness
 		testHarness.processWatermark(new Watermark(21));