You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/23 15:30:08 UTC
[1/3] flink git commit: [hotfix] Fix various small issues in
WindowOperatorContractTest
Repository: flink
Updated Branches:
refs/heads/master 8319a457d -> 68289b1a5
[hotfix] Fix various small issues in WindowOperatorContractTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25d52e4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25d52e4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25d52e4d
Branch: refs/heads/master
Commit: 25d52e4df216dc54d2d82e1f0b449871bda4ba74
Parents: 3c4b156
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 22 17:02:15 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 23 23:29:01 2017 +0800
----------------------------------------------------------------------
.../windowing/WindowOperatorContractTest.java | 97 +++++++++-----------
1 file changed, 44 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/25d52e4d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index abc7b3e..aaea8b1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -36,7 +37,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -155,7 +156,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
@SuppressWarnings("unchecked")
static Iterable<Integer> intIterable(Integer... values) {
- return (Iterable<Integer>) argThat(containsInAnyOrder(values));
+ return (Iterable<Integer>) argThat(contains(values));
}
static TimeWindow anyTimeWindow() {
@@ -247,55 +248,55 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
}
private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
}
private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
}
private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
}
private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
}
private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
}
private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
}
private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
}
private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
}
private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
}
private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
}
private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
- when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
}
/**
- * Verify that there is no late-date side output if the {@code WindowAssigner} does
+ * Verify that there is no late-data side output if the {@code WindowAssigner} does
* not assign any windows.
*/
@Test
@@ -346,7 +347,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
assertThat(testHarness.getSideOutput(lateOutputTag),
- containsInAnyOrder(isStreamRecord(0, 5L)));
+ contains(isStreamRecord(0, 5L)));
// we should also see side output if the WindowAssigner assigns no windows
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
@@ -358,7 +359,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L), anyAssignerContext());
assertThat(testHarness.getSideOutput(lateOutputTag),
- containsInAnyOrder(isStreamRecord(0, 5L), isStreamRecord(0, 10L)));
+ contains(isStreamRecord(0, 5L), isStreamRecord(0, 10L)));
}
@@ -520,7 +521,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
- containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+ contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
}
@Test
@@ -534,7 +535,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
- private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -573,7 +574,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
- containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+ contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
}
@Test
@@ -1067,9 +1068,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
/**
* Verify that we neither invoke the trigger nor the window function if a timer
- * for an empty merging window.
+ * for an empty merging window fires.
*/
- public void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
@@ -1133,7 +1134,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we neither invoke the trigger nor the window function if a timer
* fires for a merging window that was already garbage collected.
*/
- public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
@@ -1166,7 +1167,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
-
testHarness.processElement(new StreamRecord<>(0, 0L));
assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
@@ -1311,7 +1311,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor());
}
- public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
@@ -1372,8 +1372,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4))), anyMergeCallback());
- verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback());
+ verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
+ verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
+
verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback());
@@ -1392,7 +1393,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
/**
* Verify that windows are merged eagerly, if possible.
*/
- public void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
// in this test we only have one state window and windows are eagerly
// merged into the first window
@@ -1456,8 +1457,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
shouldMergeWindows(
mockAssigner,
- Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
- Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
new TimeWindow(0, 4));
// don't register a timer or update state in onElement, this checks
@@ -1491,7 +1492,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we only keep one of the underlying state windows. This test also verifies that
* GC timers are correctly deleted when merging windows.
*/
- public void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
@@ -1562,8 +1563,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
shouldMergeWindows(
mockAssigner,
- Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
- Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
new TimeWindow(0, 4));
testHarness.processElement(new StreamRecord<>(0, 0L));
@@ -1618,7 +1619,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor());
}
- public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
@@ -1663,7 +1664,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor());
}
- public void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
@@ -1767,7 +1768,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testGarbageCollectionTimer(new ProcessingTimeAdaptor());
}
- public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1812,7 +1813,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor());
}
- public void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1868,7 +1869,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor());
}
- public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1938,7 +1939,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we correctly clean up even when a purging trigger has purged
* window state.
*/
- public void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2009,7 +2010,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we correctly clean up even when a purging trigger has purged
* window state.
*/
- public void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2075,7 +2076,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testMergingWindowSetClearedAtGarbageCollection(new ProcessingTimeAdaptor());
}
- public void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
+ private void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2120,12 +2121,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
- doAnswer(new Answer<TriggerResult>() {
- @Override
- public TriggerResult answer(InvocationOnMock invocation) throws Exception {
- return TriggerResult.FIRE;
- }
- }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+ shouldFireOnElement(mockTrigger);
// 20 is just at the limit, window.maxTime() is 1 and allowed lateness is 20
testHarness.processWatermark(new Watermark(20));
@@ -2159,12 +2155,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
- doAnswer(new Answer<TriggerResult>() {
- @Override
- public TriggerResult answer(InvocationOnMock invocation) throws Exception {
- return TriggerResult.FIRE;
- }
- }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+ shouldFireOnElement(mockTrigger);
// window.maxTime() == 1 plus 20L of allowed lateness
testHarness.processWatermark(new Watermark(21));
[3/3] flink git commit: [FLINK-5972] Don't allow shrinking merging
windows
Posted by tz...@apache.org.
[FLINK-5972] Don't allow shrinking merging windows
This closes #3587.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68289b1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68289b1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68289b1a
Branch: refs/heads/master
Commit: 68289b1a52db7157d23085850ec947e78e729f01
Parents: 25d52e4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 21 14:58:45 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 23 23:29:02 2017 +0800
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 13 ++++
.../operators/windowing/WindowOperator.java | 15 +++-
.../windowing/WindowOperatorContractTest.java | 80 ++++++++++++++++++++
3 files changed, 107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 951f661..24c8d32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -121,6 +121,19 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
+
+ if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+ throw new UnsupportedOperationException("The end timestamp of an " +
+ "event-time window cannot become earlier than the current watermark " +
+ "by merging. Current watermark: " + internalTimerService.currentWatermark() +
+ " window: " + mergeResult);
+ } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+ throw new UnsupportedOperationException("The end timestamp of a " +
+ "processing-time window cannot become earlier than the current processing time " +
+ "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+ " window: " + mergeResult);
+ }
+
context.key = key;
context.window = mergeResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b4283d8..3745659 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -131,7 +131,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@code window.maxTimestamp + allowedLateness} landmark.
* </ul>
*/
- private final long allowedLateness;
+ protected final long allowedLateness;
/**
* {@link OutputTag} to use for late arriving events. Elements for which
@@ -352,6 +352,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
+
+ if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+ throw new UnsupportedOperationException("The end timestamp of an " +
+ "event-time window cannot become earlier than the current watermark " +
+ "by merging. Current watermark: " + internalTimerService.currentWatermark() +
+ " window: " + mergeResult);
+ } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+ throw new UnsupportedOperationException("The end timestamp of a " +
+ "processing-time window cannot become earlier than the current processing time " +
+ "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+ " window: " + mergeResult);
+ }
+
context.key = key;
context.window = mergeResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index aaea8b1..8aae46a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1479,6 +1479,86 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
@Test
+ public void testRejectShrinkingMergingEventTimeWindows() throws Exception {
+ testRejectShrinkingMergingWindows(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testRejectShrinkingMergingProcessingTimeWindows() throws Exception {
+ testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor());
+ }
+
+ /**
+ * A misbehaving {@code WindowAssigner} can cause a window to become late by merging if
+ * it moves the end-of-window time before the watermark. This verifies that we don't allow that.
+ */
+ void testRejectShrinkingMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ int allowedLateness = 10;
+
+ if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+ // we don't have allowed lateness for processing time
+ allowedLateness = 0;
+ }
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, 0);
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 22)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+ timeAdaptor.advanceTime(testHarness, 20);
+
+ // our window should still be there
+ assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+ // the result timestamp is ... + 2 because a watermark t says no element with
+ // timestamp <= t will come in the future and because window ends are exclusive:
+ // a window (0, 12) will have 11 as maxTimestamp. With the watermark at 20, 10 would
+ // already be considered late
+ shouldMergeWindows(
+ mockAssigner,
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+ new TimeWindow(0, 20 - allowedLateness + 2));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // now merge it to a window that is just late
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+ shouldMergeWindows(
+ mockAssigner,
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+ new TimeWindow(0, 20 - allowedLateness + 1));
+
+ expectedException.expect(UnsupportedOperationException.class);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+ }
+
+ @Test
public void testMergingOfExistingEventTimeWindows() throws Exception {
testMergingOfExistingWindows(new EventTimeAdaptor());
}
[2/3] flink git commit: [hotfix] Add
EvictingWindowOperatorContractTest
Posted by tz...@apache.org.
[hotfix] Add EvictingWindowOperatorContractTest
The existing WindowOperatorContractTest is turned into a test base while
RegularWindowOperatorContract test tests WindowOperator and
EvictingWindowOperatorTest tests EvictingWindowOperator. For this to
work, the base tests now always use List windows and we have specific
tests for reducing/folding windows in RegularWindowOperatorContractTest.
This also patches in the missing side output support for
EvictingWindowOperator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c4b1565
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c4b1565
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c4b1565
Branch: refs/heads/master
Commit: 3c4b156527e9ca7cb2dafdda706913e91d688133
Parents: 8319a45
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 21 15:00:24 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 23 23:29:01 2017 +0800
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 26 +-
.../operators/windowing/WindowOperator.java | 4 +-
.../EvictingWindowOperatorContractTest.java | 99 ++++++
.../RegularWindowOperatorContractTest.java | 288 +++++++++++++++++
.../windowing/WindowOperatorContractTest.java | 310 +++++--------------
5 files changed, 484 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 8dfc717..951f661 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -99,16 +99,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
- Collection<W> elementWindows = windowAssigner.assignWindows(
- element.getValue(),
- element.getTimestamp(),
- windowAssignerContext);
+ final Collection<W> elementWindows = windowAssigner.assignWindows(
+ element.getValue(), element.getTimestamp(), windowAssignerContext);
- @SuppressWarnings("unchecked")
- final K key = (K) getKeyedStateBackend().getCurrentKey();
+ //if element is handled by none of assigned elementWindows
+ boolean isSkippedElement = true;
- if (windowAssigner instanceof MergingWindowAssigner) {
+ final K key = this.<K>getKeyedStateBackend().getCurrentKey();
+ if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window : elementWindows) {
@@ -138,11 +137,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
}
});
- // check if the window is already inactive
+ // drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
+ isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
@@ -174,6 +174,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
registerCleanupTimer(actualWindow);
}
+ // need to make sure to update the merging state in state
mergingWindows.persist();
} else {
for (W window : elementWindows) {
@@ -182,6 +183,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
if (isWindowLate(window)) {
continue;
}
+ isSkippedElement = false;
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
@@ -208,6 +210,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
registerCleanupTimer(window);
}
}
+
+ // side output input event if
+ // element not handled by any window
+ // late arriving tag has been set
+ // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
+ if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) {
+ sideOutput(element);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 9ce1ae7..b4283d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -138,7 +138,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
* be emitted to this.
*/
- private final OutputTag<IN> lateDataOutputTag;
+ protected final OutputTag<IN> lateDataOutputTag;
// ------------------------------------------------------------------------
// State that is not checkpointed
@@ -574,7 +574,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*
* @param element skipped late arriving element to side output
*/
- private void sideOutput(StreamRecord<IN> element){
+ protected void sideOutput(StreamRecord<IN> element){
output.collect(lateDataOutputTag, element);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
new file mode 100644
index 0000000..7af4506
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * These tests verify that {@link EvictingWindowOperator} correctly interacts with the other
+ * windowing components: {@link WindowAssigner},
+ * {@link Trigger}.
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state.
+ *
+ * <p>These tests document the implicit contract that exists between the windowing components.
+ */
+public class EvictingWindowOperatorContractTest extends WindowOperatorContractTest {
+
+ protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ WindowAssigner<Integer, W> assigner,
+ Trigger<Integer, W> trigger,
+ long allowedLatenss,
+ InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction,
+ OutputTag<Integer> lateOutputTag) throws Exception {
+
+ KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ };
+
+ ListStateDescriptor<StreamRecord<Integer>> intListDescriptor =
+ new ListStateDescriptor<>(
+ "int-list",
+ (TypeSerializer<StreamRecord<Integer>>) new StreamElementSerializer(IntSerializer.INSTANCE));
+
+ @SuppressWarnings("unchecked")
+ EvictingWindowOperator<Integer, Integer, OUT, W> operator = new EvictingWindowOperator<>(
+ assigner,
+ assigner.getWindowSerializer(new ExecutionConfig()),
+ keySelector,
+ IntSerializer.INSTANCE,
+ intListDescriptor,
+ windowFunction,
+ trigger,
+ CountEvictor.<W>of(100),
+ allowedLatenss,
+ lateOutputTag);
+
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator,
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+ }
+
+ protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ WindowAssigner<Integer, W> assigner,
+ Trigger<Integer, W> trigger,
+ long allowedLatenss,
+ InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception {
+
+ return createWindowOperator(
+ assigner,
+ trigger,
+ allowedLatenss,
+ windowFunction,
+ null /* late output tag */);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
new file mode 100644
index 0000000..11508c5
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * These tests verify that {@link WindowOperator} correctly interacts with the other windowing
+ * components: {@link WindowAssigner},
+ * {@link Trigger}.
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state.
+ *
+ * <p>These tests document the implicit contract that exists between the windowing components.
+ */
+public class RegularWindowOperatorContractTest extends WindowOperatorContractTest {
+
+ @Test
+ public void testReducingWindow() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+
+ ReducingStateDescriptor<Integer> intReduceSumDescriptor =
+ new ReducingStateDescriptor<>(
+ "int-reduce",
+ new ReduceFunction<Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer a, Integer b) throws Exception {
+ return a + b;
+ }
+ },
+ IntSerializer.INSTANCE);
+
+ final ValueStateDescriptor<String> valueStateDescriptor =
+ new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);
+
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ // insert two elements without firing
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(window.getEnd());
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.FIRE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE should not purge contents
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+ }
+
+ @Test
+ public void testFoldingWindow() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
+ new FoldingStateDescriptor<>(
+ "int-fold",
+ 0,
+ new FoldFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer fold(Integer accumulator, Integer value) throws Exception {
+ return accumulator + value;
+ }
+ },
+ IntSerializer.INSTANCE);
+
+ final ValueStateDescriptor<String> valueStateDescriptor =
+ new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ // insert two elements without firing
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(window.getEnd());
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.FIRE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE should not purge contents
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+ }
+
+ /**
+ * Special method for creating a {@link WindowOperator} with a custom {@link StateDescriptor}
+ * for the window contents state.
+ */
+ private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ WindowAssigner<Integer, W> assigner,
+ Trigger<Integer, W> trigger,
+ long allowedLatenss,
+ StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
+ InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
+
+ KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>(
+ assigner,
+ assigner.getWindowSerializer(new ExecutionConfig()),
+ keySelector,
+ IntSerializer.INSTANCE,
+ stateDescriptor,
+ windowFunction,
+ trigger,
+ allowedLatenss,
+ null /* late output tag */);
+
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator,
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+ }
+
+ @Override
+ protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ WindowAssigner<Integer, W> assigner,
+ Trigger<Integer, W> trigger,
+ long allowedLatenss,
+ InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction,
+ OutputTag<Integer> lateOutputTag) throws Exception {
+
+ KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ };
+
+ ListStateDescriptor<Integer> intListDescriptor =
+ new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE);
+
+
+ @SuppressWarnings("unchecked")
+ WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> operator = new WindowOperator<>(
+ assigner,
+ assigner.getWindowSerializer(new ExecutionConfig()),
+ keySelector,
+ IntSerializer.INSTANCE,
+ intListDescriptor,
+ windowFunction,
+ trigger,
+ allowedLatenss,
+ lateOutputTag);
+
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator,
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+ }
+
+ @Override
+ protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ WindowAssigner<Integer, W> assigner,
+ Trigger<Integer, W> trigger,
+ long allowedLatenss,
+ InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception {
+
+ return createWindowOperator(
+ assigner,
+ trigger,
+ allowedLatenss,
+ windowFunction,
+ null /* late output tag */);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index c542b43..abc7b3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -18,20 +18,32 @@
package org.apache.flink.streaming.runtime.operators.windowing;
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -42,55 +54,38 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
-
/**
- * These tests verify that {@link WindowOperator} correctly interacts with the other windowing
+ * Base for window operator tests that verify correct interaction with the other windowing
* components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner},
* {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
* {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state.
*
* <p>These tests document the implicit contract that exists between the windowing components.
*/
-public class WindowOperatorContractTest extends TestLogger {
+public abstract class WindowOperatorContractTest extends TestLogger {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
private static ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE, null);
- private static ListStateDescriptor<Integer> intListDescriptor =
- new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE);
-
- private static ReducingStateDescriptor<Integer> intReduceSumDescriptor =
- new ReducingStateDescriptor<>("int-reduce", new Sum(), IntSerializer.INSTANCE);
-
- private static FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
- new FoldingStateDescriptor<>("int-fold", 0, new FoldSum(), IntSerializer.INSTANCE);
-
static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction() throws Exception {
@SuppressWarnings("unchecked")
InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = mock(InternalWindowFunction.class);
@@ -313,7 +308,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction, lateOutputTag);
testHarness.open();
@@ -338,7 +333,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction, lateOutputTag);
testHarness.open();
@@ -376,7 +371,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -401,7 +396,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -425,7 +420,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -472,7 +467,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -488,100 +483,6 @@ public class WindowOperatorContractTest extends TestLogger {
}
@Test
- public void testReducingWindow() throws Exception {
-
- WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
- Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
- InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
-
- KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction);
-
- testHarness.open();
-
- when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
- .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
-
- assertEquals(0, testHarness.getOutput().size());
- assertEquals(0, testHarness.numKeyedStateEntries());
-
- // insert two elements without firing
- testHarness.processElement(new StreamRecord<>(1, 0L));
- testHarness.processElement(new StreamRecord<>(1, 0L));
-
- doAnswer(new Answer<TriggerResult>() {
- @Override
- public TriggerResult answer(InvocationOnMock invocation) throws Exception {
- TimeWindow window = (TimeWindow) invocation.getArguments()[2];
- Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
- context.registerEventTimeTimer(window.getEnd());
- context.getPartitionedState(valueStateDescriptor).update("hello");
- return TriggerResult.FIRE;
- }
- }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
-
- testHarness.processElement(new StreamRecord<>(1, 0L));
-
- verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-
- // clear is only called at cleanup time/GC time
- verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
-
- // FIRE should not purge contents
- assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
- assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
- }
-
- @Test
- public void testFoldingWindow() throws Exception {
-
- WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
- Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
- InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
-
- KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction);
-
- testHarness.open();
-
- when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
- .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
-
- assertEquals(0, testHarness.getOutput().size());
- assertEquals(0, testHarness.numKeyedStateEntries());
-
- // insert two elements without firing
- testHarness.processElement(new StreamRecord<>(1, 0L));
- testHarness.processElement(new StreamRecord<>(1, 0L));
-
- doAnswer(new Answer<TriggerResult>() {
- @Override
- public TriggerResult answer(InvocationOnMock invocation) throws Exception {
- TimeWindow window = (TimeWindow) invocation.getArguments()[2];
- Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
- context.registerEventTimeTimer(window.getEnd());
- context.getPartitionedState(valueStateDescriptor).update("hello");
- return TriggerResult.FIRE;
- }
- }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
-
- testHarness.processElement(new StreamRecord<>(1, 0L));
-
- verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-
- // clear is only called at cleanup time/GC time
- verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
-
- // FIRE should not purge contents
- assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
- assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
- }
-
- @Test
public void testEmittingFromWindowFunction() throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
@@ -589,7 +490,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -640,7 +541,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -683,7 +584,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -725,7 +626,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -768,7 +669,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -813,7 +714,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -866,7 +767,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -924,7 +825,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -986,7 +887,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1047,7 +948,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1115,7 +1016,7 @@ public class WindowOperatorContractTest extends TestLogger {
mock(InternalWindowFunction.class);
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1179,7 +1080,7 @@ public class WindowOperatorContractTest extends TestLogger {
mock(InternalWindowFunction.class);
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1243,7 +1144,7 @@ public class WindowOperatorContractTest extends TestLogger {
mock(InternalWindowFunction.class);
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1313,7 +1214,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1360,7 +1261,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1418,7 +1319,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1459,7 +1360,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1501,7 +1402,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1598,7 +1499,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1681,7 +1582,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1725,7 +1626,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1773,7 +1674,7 @@ public class WindowOperatorContractTest extends TestLogger {
assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1798,7 +1699,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -1827,7 +1728,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -1873,7 +1774,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -1918,7 +1819,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -1974,7 +1875,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -2044,7 +1945,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -2115,7 +2016,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -2181,7 +2082,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -2209,7 +2110,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -2248,7 +2149,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
testHarness.open();
@@ -2287,7 +2188,7 @@ public class WindowOperatorContractTest extends TestLogger {
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
@@ -2332,11 +2233,11 @@ public class WindowOperatorContractTest extends TestLogger {
}).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext());
// only fire on the timestamp==0L timers, not the gc timers
- when(mockTrigger.onEventTime(eq(0L), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ when(mockTrigger.onEventTime(eq(0L), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
mockWindowFunction = mockWindowFunction();
- testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+ testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.setup();
testHarness.initializeState(snapshot);
@@ -2367,76 +2268,19 @@ public class WindowOperatorContractTest extends TestLogger {
assertEquals(0, testHarness.numEventTimeTimers());
}
- private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
WindowAssigner<Integer, W> assigner,
Trigger<Integer, W> trigger,
- long allowedLatenss,
- StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
- InternalWindowFunction<ACC, OUT, Integer, W> windowFunction,
- OutputTag<Integer> lateOutputTag) throws Exception {
-
- KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
+ long allowedLateness,
+ InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction,
+ OutputTag<Integer> lateOutputTag) throws Exception;
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- };
-
- @SuppressWarnings("unchecked")
- WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>(
- assigner,
- assigner.getWindowSerializer(new ExecutionConfig()),
- keySelector,
- IntSerializer.INSTANCE,
- stateDescriptor,
- windowFunction,
- trigger,
- allowedLatenss,
- lateOutputTag);
-
- return new KeyedOneInputStreamOperatorTestHarness<>(
- operator,
- keySelector,
- BasicTypeInfo.INT_TYPE_INFO);
- }
-
- private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+ protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
WindowAssigner<Integer, W> assigner,
Trigger<Integer, W> trigger,
long allowedLatenss,
- StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
- InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
- return createWindowOperator(
- assigner,
- trigger,
- allowedLatenss,
- stateDescriptor,
- windowFunction,
- null /* late output tag */);
- }
-
+ InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception;
- private static class Sum implements ReduceFunction<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- }
-
- private static class FoldSum implements FoldFunction<Integer, Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer fold(
- Integer accumulator,
- Integer value) throws Exception {
- return accumulator + value;
- }
- }
private interface TimeDomainAdaptor {