You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/24 05:51:18 UTC

[1/3] flink git commit: [hotfix] Add EvictingWindowOperatorContractTest

Repository: flink
Updated Branches:
  refs/heads/release-1.2 7fbb115bc -> b99883430


[hotfix] Add EvictingWindowOperatorContractTest

The existing WindowOperatorContractTest is turned into a test base while
RegularWindowOperatorContract test tests WindowOperator and
EvictingWindowOperatorTest tests EvictingWindowOperator. For this to
work, the base tests now always use List windows and we have specific
tests for reducing/folding windows in RegularWindowOperatorContractTest.

This also patches in the missing side output support for
EvictingWindowOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e27b6971
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e27b6971
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e27b6971

Branch: refs/heads/release-1.2
Commit: e27b69710c30ee44b0da6c6debadd5bef8d72c07
Parents: 7fbb115
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 21 15:00:24 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 24 12:24:51 2017 +0800

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       |  12 +-
 .../EvictingWindowOperatorContractTest.java     |  83 +++++
 .../RegularWindowOperatorContractTest.java      | 269 ++++++++++++++
 .../windowing/WindowOperatorContractTest.java   | 363 ++++++++-----------
 4 files changed, 504 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 17b3984..2d28c00 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -85,15 +85,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	@Override
 	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		Collection<W> elementWindows = windowAssigner.assignWindows(
-				element.getValue(),
-				element.getTimestamp(),
-				windowAssignerContext);
+		final Collection<W> elementWindows = windowAssigner.assignWindows(
+				element.getValue(), element.getTimestamp(), windowAssignerContext);
 
-		final K key = (K) getKeyedStateBackend().getCurrentKey();
+		final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
-
 			MergingWindowSet<W> mergingWindows = getMergingWindowSet();
 
 			for (W window : elementWindows) {
@@ -127,7 +124,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 							}
 						});
 
-				// check if the window is already inactive
+				// drop if the window is already late
 				if (isLate(actualWindow)) {
 					mergingWindows.retireWindow(actualWindow);
 					continue;
@@ -163,6 +160,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				registerCleanupTimer(actualWindow);
 			}
 
+			// need to make sure to update the merging state in state
 			mergingWindows.persist();
 		} else {
 			for (W window : elementWindows) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
new file mode 100644
index 0000000..50c5950
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+/**
+ * These tests verify that {@link EvictingWindowOperator} correctly interacts with the other
+ * windowing components: {@link WindowAssigner},
+ * {@link Trigger}.
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state.
+ *
+ * <p>These tests document the implicit contract that exists between the windowing components.
+ */
+public class EvictingWindowOperatorContractTest extends WindowOperatorContractTest {
+
+	@Override
+	protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+		WindowAssigner<Integer, W> assigner,
+		Trigger<Integer, W> trigger,
+		long allowedLatenss,
+		InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception {
+
+		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		};
+
+		ListStateDescriptor<StreamRecord<Integer>> intListDescriptor =
+			new ListStateDescriptor<>(
+				"int-list",
+				(TypeSerializer<StreamRecord<Integer>>) new StreamElementSerializer(IntSerializer.INSTANCE));
+
+		@SuppressWarnings("unchecked")
+		EvictingWindowOperator<Integer, Integer, OUT, W> operator = new EvictingWindowOperator<>(
+			assigner,
+			assigner.getWindowSerializer(new ExecutionConfig()),
+			keySelector,
+			IntSerializer.INSTANCE,
+			intListDescriptor,
+			windowFunction,
+			trigger,
+			CountEvictor.<W>of(100),
+			allowedLatenss);
+
+		return new KeyedOneInputStreamOperatorTestHarness<>(
+			operator,
+			keySelector,
+			BasicTypeInfo.INT_TYPE_INFO);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
new file mode 100644
index 0000000..13b68c0
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * These tests verify that {@link WindowOperator} correctly interacts with the other windowing
+ * components: {@link WindowAssigner},
+ * {@link Trigger}.
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state.
+ *
+ * <p>These tests document the implicit contract that exists between the windowing components.
+ */
+public class RegularWindowOperatorContractTest extends WindowOperatorContractTest {
+
+	@Test
+	public void testReducingWindow() throws Exception {
+
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+
+		ReducingStateDescriptor<Integer> intReduceSumDescriptor =
+				new ReducingStateDescriptor<>(
+						"int-reduce",
+						new ReduceFunction<Integer>() {
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public Integer reduce(Integer a, Integer b) throws Exception {
+								return a + b;
+							}
+						},
+						IntSerializer.INSTANCE);
+
+		final ValueStateDescriptor<String> valueStateDescriptor =
+				new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);
+
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+				createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction);
+
+		testHarness.open();
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+		assertEquals(0, testHarness.getOutput().size());
+		assertEquals(0, testHarness.numKeyedStateEntries());
+
+		// insert two elements without firing
+		testHarness.processElement(new StreamRecord<>(1, 0L));
+		testHarness.processElement(new StreamRecord<>(1, 0L));
+
+		doAnswer(new Answer<TriggerResult>() {
+			@Override
+			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+				TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+				Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+				context.registerEventTimeTimer(window.getEnd());
+				context.getPartitionedState(valueStateDescriptor).update("hello");
+				return TriggerResult.FIRE;
+			}
+		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+		testHarness.processElement(new StreamRecord<>(1, 0L));
+
+		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+		// clear is only called at cleanup time/GC time
+		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+		// FIRE should not purge contents
+		assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+		assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+	}
+
+	@Test
+	public void testFoldingWindow() throws Exception {
+
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
+			new FoldingStateDescriptor<>(
+					"int-fold",
+					0,
+					new FoldFunction<Integer, Integer>() {
+						private static final long serialVersionUID = 1L;
+
+						@Override
+						public Integer fold(Integer accumulator, Integer value) throws Exception {
+							return accumulator + value;
+						}
+					},
+					IntSerializer.INSTANCE);
+
+		final ValueStateDescriptor<String> valueStateDescriptor =
+				new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+				createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction);
+
+		testHarness.open();
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
+
+		assertEquals(0, testHarness.getOutput().size());
+		assertEquals(0, testHarness.numKeyedStateEntries());
+
+		// insert two elements without firing
+		testHarness.processElement(new StreamRecord<>(1, 0L));
+		testHarness.processElement(new StreamRecord<>(1, 0L));
+
+		doAnswer(new Answer<TriggerResult>() {
+			@Override
+			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+				TimeWindow window = (TimeWindow) invocation.getArguments()[2];
+				Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+				context.registerEventTimeTimer(window.getEnd());
+				context.getPartitionedState(valueStateDescriptor).update("hello");
+				return TriggerResult.FIRE;
+			}
+		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+		testHarness.processElement(new StreamRecord<>(1, 0L));
+
+		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+		// clear is only called at cleanup time/GC time
+		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
+
+		// FIRE should not purge contents
+		assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
+		assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
+	}
+
+	/**
+	 * Special method for creating a {@link WindowOperator} with a custom {@link StateDescriptor}
+	 * for the window contents state.
+	 */
+	private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+			WindowAssigner<Integer, W> assigner,
+			Trigger<Integer, W> trigger,
+			long allowedLatenss,
+			StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
+			InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
+
+		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		};
+
+		@SuppressWarnings("unchecked")
+		WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>(
+				assigner,
+				assigner.getWindowSerializer(new ExecutionConfig()),
+				keySelector,
+				IntSerializer.INSTANCE,
+				stateDescriptor,
+				windowFunction,
+				trigger,
+				allowedLatenss);
+
+		return new KeyedOneInputStreamOperatorTestHarness<>(
+				operator,
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+	}
+
+	@Override
+	protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+			WindowAssigner<Integer, W> assigner,
+			Trigger<Integer, W> trigger,
+			long allowedLatenss,
+			InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception {
+
+		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		};
+
+		ListStateDescriptor<Integer> intListDescriptor =
+				new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE);
+
+
+		@SuppressWarnings("unchecked")
+		WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> operator = new WindowOperator<>(
+				assigner,
+				assigner.getWindowSerializer(new ExecutionConfig()),
+				keySelector,
+				IntSerializer.INSTANCE,
+				intListDescriptor,
+				windowFunction,
+				trigger,
+				allowedLatenss);
+
+		return new KeyedOneInputStreamOperatorTestHarness<>(
+				operator,
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 47ead66..1d51b45 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -18,20 +18,32 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -42,54 +54,37 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
-
 /**
- * These tests verify that {@link WindowOperator} correctly interacts with the other windowing
+ * Base for window operator tests that verify correct interaction with the other windowing
  * components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner},
  * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
  * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state.
  *
  * <p>These tests document the implicit contract that exists between the windowing components.
  */
-public class WindowOperatorContractTest extends TestLogger {
+public abstract class WindowOperatorContractTest extends TestLogger {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
 
 	private static ValueStateDescriptor<String> valueStateDescriptor =
 			new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE, null);
 
-	private static ListStateDescriptor<Integer> intListDescriptor =
-			new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE);
-
-	private static ReducingStateDescriptor<Integer> intReduceSumDescriptor =
-			new ReducingStateDescriptor<>("int-reduce", new Sum(), IntSerializer.INSTANCE);
-
-	private static FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
-			new FoldingStateDescriptor<>("int-fold", 0, new FoldSum(), IntSerializer.INSTANCE);
-
 	static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction() throws Exception {
 		@SuppressWarnings("unchecked")
 		InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = mock(InternalWindowFunction.class);
@@ -306,7 +301,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -331,7 +326,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -355,7 +350,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -402,7 +397,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -418,100 +413,6 @@ public class WindowOperatorContractTest extends TestLogger {
 	}
 
 	@Test
-	public void testReducingWindow() throws Exception {
-
-		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
-		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
-		InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
-
-		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction);
-
-		testHarness.open();
-
-		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
-				.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
-
-		assertEquals(0, testHarness.getOutput().size());
-		assertEquals(0, testHarness.numKeyedStateEntries());
-
-		// insert two elements without firing
-		testHarness.processElement(new StreamRecord<>(1, 0L));
-		testHarness.processElement(new StreamRecord<>(1, 0L));
-
-		doAnswer(new Answer<TriggerResult>() {
-			@Override
-			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
-				TimeWindow window = (TimeWindow) invocation.getArguments()[2];
-				Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
-				context.registerEventTimeTimer(window.getEnd());
-				context.getPartitionedState(valueStateDescriptor).update("hello");
-				return TriggerResult.FIRE;
-			}
-		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
-
-		testHarness.processElement(new StreamRecord<>(1, 0L));
-
-		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-
-		// clear is only called at cleanup time/GC time
-		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
-
-		// FIRE should not purge contents
-		assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
-		assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
-	}
-
-	@Test
-	public void testFoldingWindow() throws Exception {
-
-		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
-		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
-		InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
-
-		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction);
-
-		testHarness.open();
-
-		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
-				.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
-
-		assertEquals(0, testHarness.getOutput().size());
-		assertEquals(0, testHarness.numKeyedStateEntries());
-
-		// insert two elements without firing
-		testHarness.processElement(new StreamRecord<>(1, 0L));
-		testHarness.processElement(new StreamRecord<>(1, 0L));
-
-		doAnswer(new Answer<TriggerResult>() {
-			@Override
-			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
-				TimeWindow window = (TimeWindow) invocation.getArguments()[2];
-				Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
-				context.registerEventTimeTimer(window.getEnd());
-				context.getPartitionedState(valueStateDescriptor).update("hello");
-				return TriggerResult.FIRE;
-			}
-		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
-
-		testHarness.processElement(new StreamRecord<>(1, 0L));
-
-		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-
-		// clear is only called at cleanup time/GC time
-		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
-
-		// FIRE should not purge contents
-		assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
-		assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
-	}
-
-	@Test
 	public void testEmittingFromWindowFunction() throws Exception {
 
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
@@ -519,7 +420,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -570,7 +471,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -613,7 +514,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -655,7 +556,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -698,7 +599,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -743,7 +644,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -796,7 +697,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -854,7 +755,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -916,7 +817,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -977,7 +878,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1045,7 +946,7 @@ public class WindowOperatorContractTest extends TestLogger {
 				mock(InternalWindowFunction.class);
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1109,7 +1010,7 @@ public class WindowOperatorContractTest extends TestLogger {
 				mock(InternalWindowFunction.class);
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1148,6 +1049,83 @@ public class WindowOperatorContractTest extends TestLogger {
 	}
 
 	@Test
+	public void testNoEventTimeFiringForGarbageCollectedMergingWindow() throws Exception {
+		testNoTimerFiringForGarbageCollectedMergingWindow(new EventTimeAdaptor());
+	}
+
+	@Test
+	public void testNoProcessingTimeFiringForGarbageCollectedMergingWindow() throws Exception {
+		testNoTimerFiringForGarbageCollectedMergingWindow(new ProcessingTimeAdaptor());
+	}
+
+
+	/**
+	 * Verify that we neither invoke the trigger nor the window function if a timer
+	 * fires for a merging window that was already garbage collected.
+	 */
+	public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+
+		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+		timeAdaptor.setIsEventTime(mockAssigner);
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+
+		@SuppressWarnings("unchecked")
+		InternalWindowFunction<Iterable<Integer>, List<Integer>, Integer, TimeWindow> mockWindowFunction =
+				mock(InternalWindowFunction.class);
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness =
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
+
+		testHarness.open();
+
+		timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.extractOutputStreamRecords().size());
+		assertEquals(0, testHarness.numKeyedStateEntries());
+
+		doAnswer(new Answer<TriggerResult>() {
+			@Override
+			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+				Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+				// set a timer for after the GC time
+				timeAdaptor.registerTimer(context, 10L);
+				return TriggerResult.CONTINUE;
+			}
+		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+		assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and gc timer
+
+		timeAdaptor.shouldContinueOnTime(mockTrigger);
+
+		// this should trigger GC
+		timeAdaptor.advanceTime(testHarness, 4L);
+
+		verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());
+
+		assertEquals(0, testHarness.numKeyedStateEntries());
+		// we still have a dangling timer because our trigger doesn't do cleanup
+		assertEquals(1, timeAdaptor.numTimers(testHarness));
+
+		timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
+
+		verify(mockWindowFunction, never())
+				.apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+
+		// now we trigger the dangling timer
+		timeAdaptor.advanceTime(testHarness, 10L);
+
+		// we don't fire again
+		timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
+	}
+
+	@Test
 	public void testEventTimeTimerCreationAndDeletion() throws Exception {
 		testTimerCreationAndDeletion(new EventTimeAdaptor());
 	}
@@ -1166,7 +1144,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1213,7 +1191,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1271,7 +1249,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1312,7 +1290,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1354,7 +1332,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1451,7 +1429,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1534,7 +1512,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1578,7 +1556,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1626,7 +1604,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1651,7 +1629,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1680,7 +1658,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1726,7 +1704,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1771,7 +1749,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1827,7 +1805,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1897,7 +1875,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1968,7 +1946,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -2034,7 +2012,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -2062,7 +2040,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -2101,7 +2079,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -2140,7 +2118,7 @@ public class WindowOperatorContractTest extends TestLogger {
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.open();
 
@@ -2185,11 +2163,11 @@ public class WindowOperatorContractTest extends TestLogger {
 		}).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext());
 
 		// only fire on the timestamp==0L timers, not the gc timers
-		when(mockTrigger.onEventTime(eq(0L), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		when(mockTrigger.onEventTime(eq(0L), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 
 		mockWindowFunction = mockWindowFunction();
 
-		testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction);
+		testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
 
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
@@ -2220,59 +2198,12 @@ public class WindowOperatorContractTest extends TestLogger {
 		assertEquals(0, testHarness.numEventTimeTimers());
 	}
 
-	private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
+	protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
 			WindowAssigner<Integer, W> assigner,
 			Trigger<Integer, W> trigger,
 			long allowedLatenss,
-			StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor,
-			InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
-
-		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
+			InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception;
 
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		};
-
-		@SuppressWarnings("unchecked")
-		WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>(
-				assigner,
-				assigner.getWindowSerializer(new ExecutionConfig()),
-				keySelector,
-				IntSerializer.INSTANCE,
-				stateDescriptor,
-				windowFunction,
-				trigger,
-				allowedLatenss);
-
-		return new KeyedOneInputStreamOperatorTestHarness<>(
-				operator,
-				keySelector,
-				BasicTypeInfo.INT_TYPE_INFO);
-	}
-
-
-	private static class Sum implements ReduceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-	}
-
-	private static class FoldSum implements FoldFunction<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer fold(
-				Integer accumulator,
-				Integer value) throws Exception {
-			return accumulator + value;
-		}
-	}
 
 	private interface TimeDomainAdaptor {
 


[3/3] flink git commit: [FLINK-5972] Don't allow shrinking merging windows

Posted by tz...@apache.org.
[FLINK-5972] Don't allow shrinking merging windows

This closes #3587.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9988343
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9988343
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9988343

Branch: refs/heads/release-1.2
Commit: b99883430decef11f6893d9db8f77bc98458979b
Parents: 785ae63
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 21 14:58:45 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 24 12:25:03 2017 +0800

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 13 ++++
 .../operators/windowing/WindowOperator.java     | 13 ++++
 .../windowing/WindowOperatorContractTest.java   | 80 ++++++++++++++++++++
 3 files changed, 106 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9988343/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 2d28c00..b0d36c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -104,6 +104,19 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 							public void merge(W mergeResult,
 									Collection<W> mergedWindows, W stateWindowResult,
 									Collection<W> mergedStateWindows) throws Exception {
+
+								if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+									throw new UnsupportedOperationException("The end timestamp of an " +
+											"event-time window cannot become earlier than the current watermark " +
+											"by merging. Current watermark: " + internalTimerService.currentWatermark() +
+											" window: " + mergeResult);
+								} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+									throw new UnsupportedOperationException("The end timestamp of a " +
+											"processing-time window cannot become earlier than the current processing time " +
+											"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+											" window: " + mergeResult);
+								}
+
 								context.key = key;
 								context.window = mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9988343/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3144b6d..d2e819f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -303,6 +303,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					public void merge(W mergeResult,
 							Collection<W> mergedWindows, W stateWindowResult,
 							Collection<W> mergedStateWindows) throws Exception {
+
+						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+							throw new UnsupportedOperationException("The end timestamp of an " +
+									"event-time window cannot become earlier than the current watermark " +
+									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
+									" window: " + mergeResult);
+						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+							throw new UnsupportedOperationException("The end timestamp of a " +
+									"processing-time window cannot become earlier than the current processing time " +
+									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+									" window: " + mergeResult);
+						}
+
 						context.key = key;
 						context.window = mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9988343/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index a206455..aee3133 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1409,6 +1409,86 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	}
 
 	@Test
+	public void testRejectShrinkingMergingEventTimeWindows() throws Exception {
+		testRejectShrinkingMergingWindows(new EventTimeAdaptor());
+	}
+
+	@Test
+	public void testRejectShrinkingMergingProcessingTimeWindows() throws Exception {
+		testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor());
+	}
+
+	/**
+	 * A misbehaving {@code WindowAssigner} can cause a window to become late by merging if
+	 * it moves the end-of-window time before the watermark. This verifies that we don't allow that.
+	 */
+	void testRejectShrinkingMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+		int allowedLateness = 10;
+
+		if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+			// we don't have allowed lateness for processing time
+			allowedLateness = 0;
+		}
+
+		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+		timeAdaptor.setIsEventTime(mockAssigner);
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+				createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction);
+
+		testHarness.open();
+
+		timeAdaptor.advanceTime(testHarness, 0);
+
+		assertEquals(0, testHarness.extractOutputStreamRecords().size());
+		assertEquals(0, testHarness.numKeyedStateEntries());
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(0, 22)));
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+		assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+		timeAdaptor.advanceTime(testHarness, 20);
+
+		// our window should still be there
+		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+		assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+		// the result timestamp is ... + 2 because a watermark t says no element with
+		// timestamp <= t will come in the future and because window ends are exclusive:
+		// a window (0, 12) will have 11 as maxTimestamp. With the watermark at 20, 10 would
+		// already be considered late
+		shouldMergeWindows(
+				mockAssigner,
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+				new TimeWindow(0, 20 - allowedLateness + 2));
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		// now merge it to a window that is just late
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+		shouldMergeWindows(
+				mockAssigner,
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+				new TimeWindow(0, 20 - allowedLateness + 1));
+
+		expectedException.expect(UnsupportedOperationException.class);
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+	}
+
+	@Test
 	public void testMergingOfExistingEventTimeWindows() throws Exception {
 		testMergingOfExistingWindows(new EventTimeAdaptor());
 	}


[2/3] flink git commit: [hotfix] Fix various small issues in WindowOperatorContractTest

Posted by tz...@apache.org.
[hotfix] Fix various small issues in WindowOperatorContractTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/785ae632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/785ae632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/785ae632

Branch: refs/heads/release-1.2
Commit: 785ae63292c6df889cd4da42d27ffcbe5862546d
Parents: e27b697
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 22 17:02:15 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 24 12:25:03 2017 +0800

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 91 +++++++++-----------
 1 file changed, 41 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/785ae632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 1d51b45..a206455 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -36,7 +37,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -154,7 +155,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 	@SuppressWarnings("unchecked")
 	static Iterable<Integer> intIterable(Integer... values) {
-		return (Iterable<Integer>) argThat(containsInAnyOrder(values));
+		return (Iterable<Integer>) argThat(contains(values));
 	}
 
 	static TimeWindow anyTimeWindow() {
@@ -246,51 +247,51 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	}
 
 	private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
 	}
 
 	private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 	}
 
 	private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
 	}
 
 	private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
 	}
 
 	private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
 	}
 
 	private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 	}
 
 	private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
 	}
 
 	private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
 	}
 
 	private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
 	}
 
 	private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 	}
 
 	private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
 	}
 
 	private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
-		when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
 	}
 
 	@Test
@@ -450,7 +451,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
-				containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
 	}
 
 	@Test
@@ -464,7 +465,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	}
 
 
-	private  void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -503,7 +504,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
-				containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
+				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
 	}
 
 	@Test
@@ -997,9 +998,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 	/**
 	 * Verify that we neither invoke the trigger nor the window function if a timer
-	 * for a non-existent merging window fires.
+	 * for an empty merging window fires.
 	 */
-	public void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1063,7 +1064,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we neither invoke the trigger nor the window function if a timer
 	 * fires for a merging window that was already garbage collected.
 	 */
-	public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1096,7 +1097,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 			}
 		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
 
-
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
 		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
@@ -1241,7 +1241,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor());
 	}
 
-	public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1302,8 +1302,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4))), anyMergeCallback());
-		verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback());
+		verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
+		verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
+
 		verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback());
 
 
@@ -1322,7 +1323,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	/**
 	 * Verify that windows are merged eagerly, if possible.
 	 */
-	public void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		// in this test we only have one state window and windows are eagerly
 		// merged into the first window
 
@@ -1386,8 +1387,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		shouldMergeWindows(
 				mockAssigner,
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
 				new TimeWindow(0, 4));
 
 		// don't register a timer or update state in onElement, this checks
@@ -1421,7 +1422,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we only keep one of the underlying state windows. This test also verifies that
 	 * GC timers are correctly deleted when merging windows.
 	 */
-	public void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1492,8 +1493,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		shouldMergeWindows(
 				mockAssigner,
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
-				Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
 				new TimeWindow(0, 4));
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
@@ -1548,7 +1549,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor());
 	}
 
-	public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1593,7 +1594,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor());
 	}
 
-	public void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
@@ -1697,7 +1698,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testGarbageCollectionTimer(new ProcessingTimeAdaptor());
 	}
 
-	public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1742,7 +1743,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor());
 	}
 
-	public void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1798,7 +1799,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor());
 	}
 
-	public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1868,7 +1869,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we correctly clean up even when a purging trigger has purged
 	 * window state.
 	 */
-	public void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1939,7 +1940,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	 * Verify that we correctly clean up even when a purging trigger has purged
 	 * window state.
 	 */
-	public void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2005,7 +2006,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testMergingWindowSetClearedAtGarbageCollection(new ProcessingTimeAdaptor());
 	}
 
-	public void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
+	private void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2050,12 +2051,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		assertEquals(0, testHarness.getOutput().size());
 		assertEquals(0, testHarness.numKeyedStateEntries());
 
-		doAnswer(new Answer<TriggerResult>() {
-			@Override
-			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
-				return TriggerResult.FIRE;
-			}
-		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+		shouldFireOnElement(mockTrigger);
 
 		// 20 is just at the limit, window.maxTime() is 1 and allowed lateness is 20
 		testHarness.processWatermark(new Watermark(20));
@@ -2089,12 +2085,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		assertEquals(0, testHarness.getOutput().size());
 		assertEquals(0, testHarness.numKeyedStateEntries());
 
-		doAnswer(new Answer<TriggerResult>() {
-			@Override
-			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
-				return TriggerResult.FIRE;
-			}
-		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+		shouldFireOnElement(mockTrigger);
 
 		// window.maxTime() == 1 plus 20L of allowed lateness
 		testHarness.processWatermark(new Watermark(21));