You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/23 14:01:42 UTC
[02/10] flink git commit: [FLINK-4552] Refactor
WindowOperator/Trigger Tests
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
new file mode 100644
index 0000000..47ead66
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -0,0 +1,2495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * These tests verify that {@link WindowOperator} correctly interacts with the other windowing
+ * components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner},
+ * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state.
+ *
+ * <p>These tests document the implicit contract that exists between the windowing components.
+ */
+public class WindowOperatorContractTest extends TestLogger {
+
+ private static ValueStateDescriptor<String> valueStateDescriptor =
+ new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE, null);
+
+ private static ListStateDescriptor<Integer> intListDescriptor =
+ new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE);
+
+ private static ReducingStateDescriptor<Integer> intReduceSumDescriptor =
+ new ReducingStateDescriptor<>("int-reduce", new Sum(), IntSerializer.INSTANCE);
+
+ private static FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
+ new FoldingStateDescriptor<>("int-fold", 0, new FoldSum(), IntSerializer.INSTANCE);
+
+ static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction() throws Exception {
+ @SuppressWarnings("unchecked")
+ InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = mock(InternalWindowFunction.class);
+
+ return mockWindowFunction;
+ }
+
+ static <T, W extends Window> Trigger<T, W> mockTrigger() throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger<T, W> mockTrigger = mock(Trigger.class);
+
+ when(mockTrigger.onElement(Matchers.<T>any(), anyLong(), Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ when(mockTrigger.onEventTime(anyLong(), Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ when(mockTrigger.onProcessingTime(anyLong(), Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+
+ return mockTrigger;
+ }
+
+ static <T> WindowAssigner<T, TimeWindow> mockTimeWindowAssigner() throws Exception {
+ @SuppressWarnings("unchecked")
+ WindowAssigner<T, TimeWindow> mockAssigner = mock(WindowAssigner.class);
+
+ when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new TimeWindow.Serializer());
+ when(mockAssigner.isEventTime()).thenReturn(true);
+
+ return mockAssigner;
+ }
+
+ static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception {
+ @SuppressWarnings("unchecked")
+ WindowAssigner<T, GlobalWindow> mockAssigner = mock(WindowAssigner.class);
+
+ when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new GlobalWindow.Serializer());
+ when(mockAssigner.isEventTime()).thenReturn(true);
+ when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));
+
+ return mockAssigner;
+ }
+
+
+ static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() throws Exception {
+ @SuppressWarnings("unchecked")
+ MergingWindowAssigner<T, TimeWindow> mockAssigner = mock(MergingWindowAssigner.class);
+
+ when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new TimeWindow.Serializer());
+ when(mockAssigner.isEventTime()).thenReturn(true);
+
+ return mockAssigner;
+ }
+
+
+ static WindowAssigner.WindowAssignerContext anyAssignerContext() {
+ return Mockito.any();
+ }
+
+ static Trigger.TriggerContext anyTriggerContext() {
+ return Mockito.any();
+ }
+
+ static <T> Collector<T> anyCollector() {
+ return Mockito.any();
+ }
+
+ static Iterable<Integer> anyIntIterable() {
+ return Mockito.any();
+ }
+
+ @SuppressWarnings("unchecked")
+ static Iterable<Integer> intIterable(Integer... values) {
+ return (Iterable<Integer>) argThat(containsInAnyOrder(values));
+ }
+
+ static TimeWindow anyTimeWindow() {
+ return Mockito.any();
+ }
+
+ static Trigger.OnMergeContext anyOnMergeContext() {
+ return Mockito.any();
+ }
+
+ static MergingWindowAssigner.MergeCallback anyMergeCallback() {
+ return Mockito.any();
+ }
+
+
+ static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+ .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+ }
+
+ private static <T> void shouldDeleteEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.deleteEventTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+ .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+ }
+
+ private static <T> void shouldRegisterProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerProcessingTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+ .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+ }
+
+ private static <T> void shouldDeleteProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.deleteProcessingTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+ .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T, W extends Window> void shouldMergeWindows(final MergingWindowAssigner<T, W> assigner, final Collection<? extends W> expectedWindows, final Collection<W> toMerge, final W mergeResult) {
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ Collection<W> windows = (Collection<W>) invocation.getArguments()[0];
+
+ MergingWindowAssigner.MergeCallback callback = (MergingWindowAssigner.MergeCallback) invocation.getArguments()[1];
+
+ // verify the expected windows
+ assertThat(windows, containsInAnyOrder(expectedWindows.toArray()));
+
+ callback.merge(toMerge, mergeResult);
+ return null;
+ }
+ })
+ .when(assigner).mergeWindows(anyCollection(), Matchers.<MergingWindowAssigner.MergeCallback>anyObject());
+ }
+
+ private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ }
+
+ private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ }
+
+ private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ }
+
+ private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ }
+
+ private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ }
+
+ private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ }
+
+ private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ }
+
+ private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ }
+
+ private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ }
+
+ private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ }
+
+ private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ }
+
+ private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ }
+
+ @Test
+ public void testAssignerIsInvokedOncePerElement() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 0)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(0L), anyAssignerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockAssigner, times(2)).assignWindows(eq(0), eq(0L), anyAssignerContext());
+
+ }
+
+ @Test
+ public void testAssignerWithMultipleWindows() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ shouldFireOnElement(mockTrigger);
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ }
+
+ @Test
+ public void testWindowsDontInterfere() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 2)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 1)));
+
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ // no output so far
+ assertTrue(testHarness.extractOutputStreamRecords().isEmpty());
+
+ // state for two windows
+ assertEquals(2, testHarness.numKeyedStateEntries());
+ assertEquals(2, testHarness.numEventTimeTimers());
+
+ // now we fire
+ shouldFireOnElement(mockTrigger);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 1)));
+
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 2)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 1)), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
+ }
+
+ @Test
+ public void testOnElementCalledPerWindow() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ testHarness.processElement(new StreamRecord<>(42, 1L));
+
+ verify(mockTrigger).onElement(eq(42), eq(1L), eq(new TimeWindow(2, 4)), anyTriggerContext());
+ verify(mockTrigger).onElement(eq(42), eq(1L), eq(new TimeWindow(0, 2)), anyTriggerContext());
+
+ verify(mockTrigger, times(2)).onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext());
+ }
+
+ @Test
+ public void testReducingWindow() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ // insert two elements without firing
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(window.getEnd());
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.FIRE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE should not purge contents
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+ }
+
+ @Test
+ public void testFoldingWindow() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ // insert two elements without firing
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(window.getEnd());
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.FIRE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(1, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE should not purge contents
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+ }
+
+ @Test
+ public void testEmittingFromWindowFunction() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 2)));
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ return TriggerResult.FIRE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+ out.collect("Hallo");
+ out.collect("Ciao");
+ return null;
+ }
+ }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+
+ assertThat(testHarness.extractOutputStreamRecords(),
+ containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+ }
+
+ @Test
+ public void testEmittingFromWindowFunctionOnEventTime() throws Exception {
+ testEmittingFromWindowFunction(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testEmittingFromWindowFunctionOnProcessingTime() throws Exception {
+ testEmittingFromWindowFunction(new ProcessingTimeAdaptor());
+ }
+
+
+ private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new TimeWindow(0, 2)));
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+ out.collect("Hallo");
+ out.collect("Ciao");
+ return null;
+ }
+ }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockWindowFunction, never()).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
+ assertTrue(testHarness.extractOutputStreamRecords().isEmpty());
+
+ timeAdaptor.shouldFireOnTime(mockTrigger);
+
+ timeAdaptor.advanceTime(testHarness, 1L);
+
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+
+ assertThat(testHarness.extractOutputStreamRecords(),
+ containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+ }
+
+ @Test
+ public void testOnElementContinue() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(window.getEnd());
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // CONTINUE should not purge contents
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+
+ // there should be no firing
+ assertEquals(0, testHarness.getOutput().size());
+ }
+
+ @Test
+ public void testOnElementFire() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(window.getEnd());
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.FIRE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE should not purge contents
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+ }
+
+ @Test
+ public void testOnElementFireAndPurge() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(window.getEnd());
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.FIRE_AND_PURGE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE_AND_PURGE should purge contents
+ assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state will stick around until GC time
+
+ // timers will stick around
+ assertEquals(4, testHarness.numEventTimeTimers());
+ }
+
+ @Test
+ public void testOnElementPurge() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ context.registerEventTimeTimer(0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.PURGE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // PURGE should purge contents
+ assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state will stick around until GC time
+
+ // timers will stick around
+ assertEquals(4, testHarness.numEventTimeTimers()); // trigger timer and GC timer
+
+ // no output
+ assertEquals(0, testHarness.getOutput().size());
+ }
+
+ @Test
+ public void testOnEventTimeContinue() throws Exception {
+ testOnTimeContinue(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testOnProcessingTimeContinue() throws Exception {
+ testOnTimeContinue(new ProcessingTimeAdaptor());
+ }
+
+ private void testOnTimeContinue(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ // this should register two timers because we have two windows
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // we don't want to fire the cleanup timer
+ timeAdaptor.registerTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ timeAdaptor.shouldContinueOnTime(mockTrigger);
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents plus trigger state for two windows
+ assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows
+
+ timeAdaptor.advanceTime(testHarness, 0L);
+
+ assertEquals(4, testHarness.numKeyedStateEntries());
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // only gc timers left
+
+ // there should be no firing
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ }
+
+ @Test
+ public void testOnEventTimeFire() throws Exception {
+ testOnTimeFire(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testOnProcessingTimeFire() throws Exception {
+ testOnTimeFire(new ProcessingTimeAdaptor());
+ }
+
+ private void testOnTimeFire(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // don't interfere with cleanup timers
+ timeAdaptor.registerTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ timeAdaptor.shouldFireOnTime(mockTrigger);
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows
+ assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows
+
+ timeAdaptor.advanceTime(testHarness, 0L);
+
+ verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE should not purge contents
+ assertEquals(4, testHarness.numKeyedStateEntries());
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // only gc timers left
+ }
+
+ @Test
+ public void testOnEventTimeFireAndPurge() throws Exception {
+ testOnTimeFireAndPurge(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testOnProcessingTimeFireAndPurge() throws Exception {
+ testOnTimeFireAndPurge(new ProcessingTimeAdaptor());
+ }
+
+ private void testOnTimeFireAndPurge(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ timeAdaptor.registerTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ timeAdaptor.shouldFireAndPurgeOnTime(mockTrigger);
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows
+ assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows
+
+ timeAdaptor.advanceTime(testHarness, 0L);
+
+ verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // FIRE_AND_PURGE should purge contents
+ assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state stays until GC time
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // gc timers are still there
+ }
+
+ @Test
+ public void testOnEventTimePurge() throws Exception {
+ testOnTimePurge(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testOnProcessingTimePurge() throws Exception {
+ testOnTimePurge(new ProcessingTimeAdaptor());
+ }
+
+ private void testOnTimePurge(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(4, 6)));
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // don't interfere with cleanup timers
+ timeAdaptor.registerTimer(context, 1L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ timeAdaptor.shouldPurgeOnTime(mockTrigger);
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows
+ assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows
+
+ timeAdaptor.advanceTime(testHarness, 1L);
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ // PURGE should purge contents
+ assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state will stick around
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // gc timers are still there
+
+ // still no output
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ }
+
+ @Test
+ public void testNoEventTimeFiringForPurgedWindow() throws Exception {
+ testNoTimerFiringForPurgedWindow(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testNoProcessingTimeFiringForPurgedWindow() throws Exception {
+ testNoTimerFiringForPurgedWindow(new ProcessingTimeAdaptor());
+ }
+
+ /**
+ * Verify that we neither invoke the trigger nor the window function if a timer
+ * for a non-existent window fires.
+ */
+ private void testNoTimerFiringForPurgedWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+
+ @SuppressWarnings("unchecked")
+ InternalWindowFunction<Iterable<Integer>, List<Integer>, Integer, TimeWindow> mockWindowFunction =
+ mock(InternalWindowFunction.class);
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4)));
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // don't interfere with cleanup timers
+ timeAdaptor.registerTimer(context, 0L);
+ return TriggerResult.PURGE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(0, testHarness.numKeyedStateEntries()); // not contents or state
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and gc timer
+
+ timeAdaptor.advanceTime(testHarness, 0L);
+
+ // trigger is not called if there is no more window (timer is silently ignored)
+ timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
+
+ verify(mockWindowFunction, never())
+ .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
+ }
+
+ @Test
+ public void testNoEventTimeFiringForPurgedMergingWindow() throws Exception {
+ testNoTimerFiringForPurgedMergingWindow(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testNoProcessingTimeFiringForPurgedMergingWindow() throws Exception {
+ testNoTimerFiringForPurgedMergingWindow(new ProcessingTimeAdaptor());
+ }
+
+
+ /**
+ * Verify that we neither invoke the trigger nor the window function if a timer
+ * for a non-existent merging window fires.
+ */
+ public void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+
+ @SuppressWarnings("unchecked")
+ InternalWindowFunction<Iterable<Integer>, List<Integer>, Integer, TimeWindow> mockWindowFunction =
+ mock(InternalWindowFunction.class);
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4)));
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // don't interfere with cleanup timers
+ timeAdaptor.registerTimer(context, 0L);
+ return TriggerResult.PURGE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(1, testHarness.numKeyedStateEntries()); // just the merging window set
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and gc timer
+
+ timeAdaptor.advanceTime(testHarness, 0L);
+
+ // trigger is not called if there is no more window (timer is silently ignored)
+ timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
+
+ verify(mockWindowFunction, never())
+ .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
+ }
+
+ @Test
+ public void testEventTimeTimerCreationAndDeletion() throws Exception {
+ testTimerCreationAndDeletion(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testProcessingTimeTimerCreationAndDeletion() throws Exception {
+ testTimerCreationAndDeletion(new ProcessingTimeAdaptor());
+ }
+
+ private void testTimerCreationAndDeletion(TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 2)));
+
+ assertEquals(0, timeAdaptor.numTimers(testHarness));
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 17);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 42);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(3, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window
+
+ timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 42);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 17);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window
+ }
+
+ @Test
+ public void testEventTimeTimerFiring() throws Exception {
+ testTimerFiring(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testProcessingTimeTimerFiring() throws Exception {
+ testTimerFiring(new ProcessingTimeAdaptor());
+ }
+
+
+ private void testTimerFiring(TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 100)));
+
+ assertEquals(0, timeAdaptor.numTimers(testHarness));
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 17);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 42);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(4, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window
+
+ timeAdaptor.advanceTime(testHarness, 1);
+
+ timeAdaptor.verifyTriggerCallback(mockTrigger, atLeastOnce(), 1L, new TimeWindow(0, 100));
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
+ assertEquals(3, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window
+
+ // doesn't do anything
+ timeAdaptor.advanceTime(testHarness, 15);
+
+ // so still the same
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
+
+ timeAdaptor.advanceTime(testHarness, 42);
+
+ timeAdaptor.verifyTriggerCallback(mockTrigger, atLeastOnce(), 17L, new TimeWindow(0, 100));
+ timeAdaptor.verifyTriggerCallback(mockTrigger, atLeastOnce(), 42L, new TimeWindow(0, 100));
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(3), null, null);
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window
+ }
+
+ @Test
+ public void testEventTimeDeletedTimerDoesNotFire() throws Exception {
+ testDeletedTimerDoesNotFire(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testProcessingTimeDeletedTimerDoesNotFire() throws Exception {
+ testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor());
+ }
+
+ public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 100)));
+
+ assertEquals(0, timeAdaptor.numTimers(testHarness));
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer
+
+ timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 1);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 2);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer
+
+ timeAdaptor.advanceTime(testHarness, 50L);
+
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(0), 1L, null);
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), 2L, new TimeWindow(0, 100));
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
+
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer
+ }
+
+ @Test
+ public void testMergeWindowsIsCalled() throws Exception {
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4))), anyMergeCallback());
+ verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback());
+ verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback());
+
+
+ }
+
+ @Test
+ public void testEventTimeWindowsAreMergedEagerly() throws Exception {
+ testWindowsAreMergedEagerly(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testProcessingTimeWindowsAreMergedEagerly() throws Exception {
+ testWindowsAreMergedEagerly(new ProcessingTimeAdaptor());
+ }
+
+ /**
+ * Verify that windows are merged eagerly, if possible.
+ */
+ public void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ // in this test we only have one state window and windows are eagerly
+ // merged into the first window
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // don't intefere with cleanup timers
+ timeAdaptor.registerTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.OnMergeContext context = (Trigger.OnMergeContext) invocation.getArguments()[1];
+ // don't intefere with cleanup timers
+ timeAdaptor.registerTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onMerge(anyTimeWindow(), anyOnMergeContext());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1];
+ timeAdaptor.deleteTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).clear();
+ return null;
+ }
+ }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext());
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 2)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(3, testHarness.numKeyedStateEntries()); // window state plus trigger state plus merging window set
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and GC timer
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4)));
+
+ shouldMergeWindows(
+ mockAssigner,
+ Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
+ Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
+ new TimeWindow(0, 4));
+
+ // don't register a timer or update state in onElement, this checks
+ // whether onMerge does correctly set those things
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ verify(mockTrigger).onMerge(eq(new TimeWindow(0, 4)), anyOnMergeContext());
+
+ assertEquals(3, testHarness.numKeyedStateEntries());
+ assertEquals(2, timeAdaptor.numTimers(testHarness));
+ }
+
+ @Test
+ public void testMergingOfExistingEventTimeWindows() throws Exception {
+ testMergingOfExistingWindows(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testMergingOfExistingProcessingTimeWindows() throws Exception {
+ testMergingOfExistingWindows(new ProcessingTimeAdaptor());
+ }
+
+ /**
+ * Verify that we only keep one of the underlying state windows. This test also verifies that
+ * GC timers are correctly deleted when merging windows.
+ */
+ public void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // don't interfere with cleanup timers
+ timeAdaptor.registerTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.OnMergeContext context = (Trigger.OnMergeContext) invocation.getArguments()[1];
+ // don't interfere with cleanup timers
+ timeAdaptor.registerTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onMerge(anyTimeWindow(), anyOnMergeContext());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1];
+ // don't interfere with cleanup timers
+ timeAdaptor.deleteTimer(context, 0L);
+ context.getPartitionedState(valueStateDescriptor).clear();
+ return null;
+ }
+ }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext());
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 2)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(3, testHarness.numKeyedStateEntries()); // window state plus trigger state plus merging window set
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // trigger timer plus GC timer
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(2, 4)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(5, testHarness.numKeyedStateEntries()); // window state plus trigger state plus merging window set
+ assertEquals(4, timeAdaptor.numTimers(testHarness)); // trigger timer plus GC timer
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(1, 3)));
+
+ shouldMergeWindows(
+ mockAssigner,
+ Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
+ Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
+ new TimeWindow(0, 4));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(3, testHarness.numKeyedStateEntries()); // window contents plus trigger state plus merging window set
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // trigger timer plus GC timer
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ }
+
+ @Test
+ public void testOnElementPurgeDoesNotCleanupMergingSet() throws Exception {
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ return TriggerResult.PURGE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(1, testHarness.numKeyedStateEntries()); // the merging window set
+
+ assertEquals(1, testHarness.numEventTimeTimers()); // one cleanup timer
+
+ assertEquals(0, testHarness.getOutput().size());
+ }
+
+ @Test
+ public void testOnEventTimePurgeDoesNotCleanupMergingSet() throws Exception {
+ testOnTimePurgeDoesNotCleanupMergingSet(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testOnProcessingTimePurgeDoesNotCleanupMergingSet() throws Exception {
+ testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor());
+ }
+
+ public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 4)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1L);
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ timeAdaptor.shouldPurgeOnTime(mockTrigger);
+
+ assertEquals(2, testHarness.numKeyedStateEntries()); // the merging window set + window contents
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // one cleanup timer + timer
+ assertEquals(0, testHarness.getOutput().size());
+
+ timeAdaptor.advanceTime(testHarness, 1L);
+
+ assertEquals(1, testHarness.numKeyedStateEntries()); // the merging window set
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // one cleanup timer
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ }
+
+ @Test
+ public void testNoEventTimeGarbageCollectionTimerForGlobalWindow() throws Exception {
+ testNoGarbageCollectionTimerForGlobalWindow(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testNoProcessingTimeGarbageCollectionTimerForGlobalWindow() throws Exception {
+ testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor());
+ }
+
+ public void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
+
+ WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, GlobalWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, GlobalWindow> mockWindowFunction = mockWindowFunction();
+
+ // this needs to be true for the test to succeed
+ assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // just the window contents
+ assertEquals(1, testHarness.numKeyedStateEntries());
+
+ // verify we have no timers for either time domain
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ }
+
+ @Test
+ public void testNoEventTimeGarbageCollectionTimerForLongMax() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, Long.MAX_VALUE - 10)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // just the window contents
+ assertEquals(1, testHarness.numKeyedStateEntries());
+
+ // no GC timer
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ }
+
+ @Test
+ public void testProcessingTimeGarbageCollectionTimerIsAlwaysWindowMaxTimestamp() throws Exception {
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ when(mockAssigner.isEventTime()).thenReturn(false);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, Long.MAX_VALUE - 10)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // just the window contents
+ assertEquals(1, testHarness.numKeyedStateEntries());
+
+ // no GC timer
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(1, testHarness.numProcessingTimeTimers());
+
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ testHarness.setProcessingTime(Long.MAX_VALUE - 10);
+
+ verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());
+
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ }
+
+ @Test
+ public void testEventTimeGarbageCollectionTimer() throws Exception {
+ testGarbageCollectionTimer(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testProcessingTimeGarbageCollectionTimer() throws Exception {
+ testGarbageCollectionTimer(new ProcessingTimeAdaptor());
+ }
+
+ public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // just the window contents
+ assertEquals(1, testHarness.numKeyedStateEntries());
+
+ assertEquals(1, timeAdaptor.numTimers(testHarness));
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+
+ verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());
+
+ assertEquals(0, timeAdaptor.numTimers(testHarness));
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+ }
+
+ @Test
+ public void testEventTimeTriggerTimerAndGarbageCollectionTimerCoincide() throws Exception {
+ testTriggerTimerAndGarbageCollectionTimerCoincide(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testProcessingTimeTriggerTimerAndGarbageCollectionTimerCoincide() throws Exception {
+ testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor());
+ }
+
+ public void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // 19 is maxTime of window
+ timeAdaptor.registerTimer(context, 19L);
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // just the window contents
+ assertEquals(1, testHarness.numKeyedStateEntries());
+
+ assertEquals(1, timeAdaptor.numTimers(testHarness));
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ timeAdaptor.advanceTime(testHarness, 19); // 19 is maxTime of the window
+
+ verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
+
+ assertEquals(0, timeAdaptor.numTimers(testHarness));
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+ }
+
+ @Test
+ public void testStateAndTimerCleanupAtEventTimeGarbageCollection() throws Exception {
+ testStateAndTimerCleanupAtEventTimeGarbageCollection(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testStateAndTimerCleanupAtProcessingTimeGarbageCollection() throws Exception {
+ testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor());
+ }
+
+ public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // very far in the future so our watermark does not trigger it
+ timeAdaptor.registerTimer(context, 1000L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.CONTINUE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1];
+ timeAdaptor.deleteTimer(context, 1000L);
+ context.getPartitionedState(valueStateDescriptor).clear();
+ return null;
+ }
+ }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // window timers/gc timers
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+
+ timeAdaptor.advanceTime(testHarness, 19 + 20);
+
+ verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());
+
+ assertEquals(0, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(0, timeAdaptor.numTimers(testHarness)); // window timers/gc timers
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+ }
+
+ @Test
+ public void testStateAndTimerCleanupAtEventTimeGarbageCollectionWithPurgingTrigger() throws Exception {
+ testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testStateAndTimerCleanupAtProcessingTimeGarbageCollectionWithPurgingTrigger() throws Exception {
+ testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(new ProcessingTimeAdaptor());
+ }
+
+ /**
+ * Verify that we correctly clean up even when a purging trigger has purged
+ * window state.
+ */
+ public void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // very far in the future so our watermark does not trigger it
+ timeAdaptor.registerTimer(context, 1000L);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.PURGE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1];
+ timeAdaptor.deleteTimer(context, 1000L);
+ context.getPartitionedState(valueStateDescriptor).clear();
+ return null;
+ }
+ }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ assertEquals(1, testHarness.numKeyedStateEntries()); // just the trigger state remains
+ assertEquals(2, timeAdaptor.numTimers(testHarness)); // window timers/gc timers
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+
+ timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+
+ verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());
+
+ assertEquals(0, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+ assertEquals(0, timeAdaptor.numTimers(testHarness)); // window timers/gc timers
+ assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness));
+
+ }
+
+ @Test
+ public void testStateAndTimerCleanupAtEventTimeGarbageCollectionWithPurgingTriggerAndMergingWindows() throws Exception {
+ testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testStateAndTimerCleanupAtProcessingTimeGarbageCollectionWithPurgingTriggerAndMergingWindows() throws Exception {
+ testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(new ProcessingTimeAdaptor());
+ }
+
+ /**
+ * Verify that we correctly clean up even when a purging trigger has purged
+ * window state.
+ */
+ public void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+ // very far in the future so our watermark does not trigger it
+ timeAdaptor.registerTimer(context, 1000);
+ context.getPartitionedState(valueStateDescriptor).update("hello");
+ return TriggerResult.PURGE;
+ }
+ }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1];
+ timeAdaptor.deleteTimer(context, 1000);
+ context.getPartitionedState(valueStateDescriptor).clear();
+ return null;
+ }
+ }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // clear is only called at cleanup time/GC time
+ verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+ assertEquals(2, testHarness.numKey
<TRUNCATED>