You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/23 14:01:44 UTC

[04/10] flink git commit: [FLINK-4552] Refactor WindowOperator/Trigger Tests

[FLINK-4552] Refactor WindowOperator/Trigger Tests

Before, tests for WindowOperator, WindowAssigner, Trigger and
WindowFunction were all conflated in WindowOperatorTest. All of these
test that a certain combination of a Trigger, WindowAssigner and
WindowFunction produce the expected output.

This change modularizes these tests and spreads them out across multiple
files. For example, one per trigger/window assigner.

The new WindowOperatorContractTest verifies that the interaction between
WindowOperator and the various other parts works as expected, that the
correct methods on Trigger and WindowFunction are called at the expected
time and that snapshotting, timers, cleanup etc. work correctly. These
tests also verify that the different state types and WindowFunctions
work correctly.

For trigger tests this introduces TriggerTestHarness. This can be used
to inject elements into Triggers they fire at the correct times. The
actual output of the WindowFunction is not important for these tests.
The new tests also make sure that triggers correctly clean up state and
timers.

WindowAssigner tests verify the behaviour of window assigners in
isolation.  They also test, for example, whether offset parameter of
time-based windows work correctly.

We keep the old WindowOperatorTest because it still provides some level
of coverage and doesn't take long to run.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb8586e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb8586e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb8586e5

Branch: refs/heads/release-1.2
Commit: bb8586e50be9e7d0b1fba5569579def595c53217
Parents: a9189c2
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 5 12:01:11 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 14:53:22 2017 +0100

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       |   45 +
 .../runtime/state/MemoryStateBackendTest.java   |   43 +
 .../assigners/EventTimeSessionWindows.java      |    4 +
 .../api/windowing/assigners/GlobalWindows.java  |    4 +-
 .../assigners/ProcessingTimeSessionWindows.java |    4 +
 .../assigners/SlidingEventTimeWindows.java      |   28 +-
 .../assigners/SlidingProcessingTimeWindows.java |   30 +-
 .../assigners/TumblingEventTimeWindows.java     |   36 +-
 .../TumblingProcessingTimeWindows.java          |   32 +-
 .../tasks/TestProcessingTimeService.java        |    2 +-
 .../api/operators/TestInternalTimerService.java |  238 ++
 .../TestProcessingTimeServiceTest.java          |    2 +-
 .../operators/windowing/CountTriggerTest.java   |  166 ++
 .../windowing/EventTimeSessionWindowsTest.java  |  179 ++
 .../windowing/EventTimeTriggerTest.java         |  153 ++
 .../operators/windowing/GlobalWindowsTest.java  |   59 +
 .../windowing/MergingWindowSetTest.java         |   25 +
 .../ProcessingTimeSessionWindowsTest.java       |  184 ++
 .../windowing/ProcessingTimeTriggerTest.java    |  134 +
 .../operators/windowing/PurgingTriggerTest.java |  149 ++
 .../windowing/SimpleTriggerTestHarness.java     |   41 +
 .../windowing/SlidingEventTimeWindowsTest.java  |  168 ++
 .../SlidingProcessingTimeWindowsTest.java       |  177 ++
 .../windowing/StreamRecordMatchers.java         |  179 ++
 .../operators/windowing/TriggerTestHarness.java |  369 +++
 .../windowing/TumblingEventTimeWindowsTest.java |  113 +
 .../TumblingProcessingTimeWindowsTest.java      |  129 +
 .../windowing/WindowOperatorContractTest.java   | 2495 ++++++++++++++++++
 .../operators/windowing/WindowOperatorTest.java |  171 --
 .../operators/windowing/WindowedValue.java      |   47 +
 .../windowing/WindowingTestHarnessTest.java     |  230 --
 .../util/AbstractStreamOperatorTestHarness.java |   20 +
 .../KeyedOneInputStreamOperatorTestHarness.java |   17 +
 .../flink/streaming/util/TestHarnessUtil.java   |   15 +-
 .../streaming/util/WindowingTestHarness.java    |  221 --
 35 files changed, 5209 insertions(+), 700 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 9a9178a..9c16ed7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -505,4 +506,48 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			return stateSerializer;
 		}
 	}
+
+	/**
+	 * Returns the total number of state entries across all keys/namespaces.
+	 */
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	public int numStateEntries() {
+		int sum = 0;
+		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+			for (Map namespaceMap : stateTable.getState()) {
+				if (namespaceMap == null) {
+					continue;
+				}
+				Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
+				for (Map entriesMap : typedMap.values()) {
+					sum += entriesMap.size();
+				}
+			}
+		}
+		return sum;
+	}
+
+	/**
+	 * Returns the total number of state entries across all keys for the given namespace.
+	 */
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	public <N> int numStateEntries(N namespace) {
+		int sum = 0;
+		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+			for (Map namespaceMap : stateTable.getState()) {
+				if (namespaceMap == null) {
+					continue;
+				}
+				Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
+				Map singleNamespace = typedMap.get(namespace);
+				if (singleNamespace != null) {
+					sum += singleNamespace.size();
+				}
+			}
+		}
+		return sum;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index ac6adff..c267afc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -18,7 +18,12 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
 
@@ -56,6 +61,44 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 	public void testReducingStateRestoreWithWrongSerializers() {}
 
 	@Test
+	@SuppressWarnings("unchecked")
+	public void testNumStateEntries() throws Exception {
+		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
+
+		assertEquals(0, heapBackend.numStateEntries());
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(0);
+		state.update("hello");
+		state.update("ciao");
+
+		assertEquals(1, heapBackend.numStateEntries());
+
+		backend.setCurrentKey(42);
+		state.update("foo");
+
+		assertEquals(2, heapBackend.numStateEntries());
+
+		backend.setCurrentKey(0);
+		state.clear();
+
+		assertEquals(1, heapBackend.numStateEntries());
+
+		backend.setCurrentKey(42);
+		state.clear();
+
+		assertEquals(0, heapBackend.numStateEntries());
+
+		backend.dispose();
+	}
+
+	@Test
 	public void testOversizedState() {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index e38f617..1703f6c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -47,6 +47,10 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW
 	protected long sessionTimeout;
 
 	protected EventTimeSessionWindows(long sessionTimeout) {
+		if (sessionTimeout <= 0) {
+			throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size");
+		}
+
 		this.sessionTimeout = sessionTimeout;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 7ea3158..9e3846d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +71,8 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 	/**
 	 * A trigger that never fires, as default Trigger for GlobalWindows.
 	 */
-	private static class NeverTrigger extends Trigger<Object, GlobalWindow> {
+	@Internal
+	public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index 52d1c03..02c680e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -47,6 +47,10 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object,
 	protected long sessionTimeout;
 
 	protected ProcessingTimeSessionWindows(long sessionTimeout) {
+		if (sessionTimeout <= 0) {
+			throw new IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 0 < size");
+		}
+
 		this.sessionTimeout = sessionTimeout;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 16171a0..ef6ed56 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -55,6 +55,10 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	private final long offset;
 
 	protected SlidingEventTimeWindows(long size, long slide, long offset) {
+		if (offset < 0 || offset >= slide || size <= 0) {
+			throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
+		}
+
 		this.size = size;
 		this.slide = slide;
 		this.offset = offset;
@@ -109,20 +113,18 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	}
 
 	/**
-	 *  Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
-	 *  elements to time windows based on the element timestamp and offset.
-	 *<p>
-	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
-	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
-	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-	 *</p>
+	 * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp and offset.
+	 *
+	 * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *
+	 * <p>Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 * such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
 	 *
-	 * <p>
-	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
-	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
-	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
-	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
-	 * </p>
 	 * @param size The size of the generated windows.
 	 * @param slide  The slide interval of the generated windows.
 	 * @param offset The offset which window start would be shifted by.

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index e03467f..c11045d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -51,7 +51,11 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 
 	private final long slide;
 
-	private SlidingProcessingTimeWindows(long size, long slide, long offset){
+	private SlidingProcessingTimeWindows(long size, long slide, long offset) {
+		if (offset < 0 || offset >= slide || size <= 0) {
+			throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
+		}
+
 		this.size = size;
 		this.slide = slide;
 		this.offset = offset;
@@ -101,20 +105,18 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	}
 
 	/**
-	 *  Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
-	 *  elements to time windows based on the element timestamp and offset.
-	 *<p>
-	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
-	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
-	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-	 *</p>
+	 * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp and offset.
+	 *
+	 * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *
+	 * <p>Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 * such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
 	 *
-	 * <p>
-	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
-	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
-	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
-	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
-	 * </p>
 	 * @param size The size of the generated windows.
 	 * @param slide  The slide interval of the generated windows.
 	 * @param offset The offset which window start would be shifted by.

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index b7fa343..d695a0c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -51,7 +51,11 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 
 	private final long offset;
 
-	protected TumblingEventTimeWindows(long size, long offset){
+	protected TumblingEventTimeWindows(long size, long offset) {
+		if (offset < 0 || offset >= size) {
+			throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
+		}
+
 		this.size = size;
 		this.offset = offset;
 	}
@@ -68,10 +72,6 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 		}
 	}
 
-	public long getSize() {
-		return size;
-	}
-
 	@Override
 	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
 		return EventTimeTrigger.create();
@@ -94,26 +94,24 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	}
 
 	/**
-	 *  Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
-	 *  elements to time windows based on the element timestamp and offset.
-	 *<p>
-	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
-	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
-	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-	 *</p>
+	 * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp and offset.
+	 *
+	 * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *
+	 * <p>Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 * such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
 	 *
-	 * <p>
-	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
-	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
-	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
-	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
-	 * </p>
 	 * @param size The size of the generated windows.
 	 * @param offset The offset which window start would be shifted by.
 	 * @return The time policy.
 	 */
 	public static TumblingEventTimeWindows of(Time size, Time offset) {
-		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
+		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index f4fb620..5b39fe0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -49,7 +49,11 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	private final long offset;
 
 
-	private TumblingProcessingTimeWindows(long size,long offset) {
+	private TumblingProcessingTimeWindows(long size, long offset) {
+		if (offset < 0 || offset >= size) {
+			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");
+		}
+
 		this.size = size;
 		this.offset = offset;
 	}
@@ -87,26 +91,24 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	}
 
 	/**
-	 *  Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
-	 *  elements to time windows based on the element timestamp and offset.
-	 *<p>
-	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
-	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
-	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-	 *</p>
+	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp and offset.
+	 *
+	 * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *
+	 * <p>Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 * such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
 	 *
-	 * <p>
-	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
-	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
-	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
-	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
-	 * </p>
 	 * @param size The size of the generated windows.
 	 * @param offset The offset which window start would be shifted by.
 	 * @return The time policy.
 	 */
 	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
-		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
+		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index b4e7e97..e5fcc1a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * */
 public class TestProcessingTimeService extends ProcessingTimeService {
 
-	private volatile long currentTime = 0L;
+	private volatile long currentTime = Long.MIN_VALUE;
 
 	private volatile boolean isTerminated;
 	private volatile boolean isQuiesced;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
new file mode 100644
index 0000000..a03a4c5
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Implementation of {@link InternalTimerService} meant to use for testing.
+ */
+@Internal
+public class TestInternalTimerService<K, N> implements InternalTimerService<N> {
+
+	private long currentProcessingTime = Long.MIN_VALUE;
+
+	private long currentWatermark = Long.MIN_VALUE;
+
+	private final KeyContext keyContext;
+
+	/**
+	 * Processing time timers that are currently in-flight.
+	 */
+	private final PriorityQueue<Timer<K, N>> processingTimeTimersQueue;
+	private final Set<Timer<K, N>> processingTimeTimers;
+
+	/**
+	 * Current waiting watermark callbacks.
+	 */
+	private final Set<Timer<K, N>> watermarkTimers;
+	private final PriorityQueue<Timer<K, N>> watermarkTimersQueue;
+
+	public TestInternalTimerService(KeyContext keyContext) {
+		this.keyContext = keyContext;
+
+		watermarkTimers = new HashSet<>();
+		watermarkTimersQueue = new PriorityQueue<>(100);
+		processingTimeTimers = new HashSet<>();
+		processingTimeTimersQueue = new PriorityQueue<>(100);
+	}
+
+	@Override
+	public long currentProcessingTime() {
+		return currentProcessingTime;
+	}
+
+	@Override
+	public long currentWatermark() {
+		return currentWatermark;
+	}
+
+	@Override
+	public void registerProcessingTimeTimer(N namespace, long time) {
+		@SuppressWarnings("unchecked")
+		Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace);
+		// make sure we only put one timer per key into the queue
+		if (processingTimeTimers.add(timer)) {
+			processingTimeTimersQueue.add(timer);
+		}
+	}
+
+	@Override
+	public void registerEventTimeTimer(N namespace, long time) {
+		@SuppressWarnings("unchecked")
+		Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace);
+		if (watermarkTimers.add(timer)) {
+			watermarkTimersQueue.add(timer);
+		}
+	}
+
+	@Override
+	public void deleteProcessingTimeTimer(N namespace, long time) {
+		@SuppressWarnings("unchecked")
+		Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace);
+
+		if (processingTimeTimers.remove(timer)) {
+			processingTimeTimersQueue.remove(timer);
+		}
+	}
+
+	@Override
+	public void deleteEventTimeTimer(N namespace, long time) {
+		@SuppressWarnings("unchecked")
+		Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace);
+		if (watermarkTimers.remove(timer)) {
+			watermarkTimersQueue.remove(timer);
+		}
+	}
+
+	public Collection<Timer<K, N>> advanceProcessingTime(long time) throws Exception {
+		List<Timer<K, N>> result = new ArrayList<>();
+
+		Timer<K, N> timer = processingTimeTimersQueue.peek();
+
+		while (timer != null && timer.timestamp <= time) {
+			processingTimeTimers.remove(timer);
+			processingTimeTimersQueue.remove();
+			result.add(timer);
+			timer = processingTimeTimersQueue.peek();
+		}
+
+		currentProcessingTime = time;
+		return result;
+	}
+
+	public Collection<Timer<K, N>> advanceWatermark(long time) throws Exception {
+		List<Timer<K, N>> result = new ArrayList<>();
+
+		Timer<K, N> timer = watermarkTimersQueue.peek();
+
+		while (timer != null && timer.timestamp <= time) {
+			watermarkTimers.remove(timer);
+			watermarkTimersQueue.remove();
+			result.add(timer);
+			timer = watermarkTimersQueue.peek();
+		}
+
+		currentWatermark = time;
+		return result;
+	}
+
+	/**
+	 * Internal class for keeping track of in-flight timers.
+	 */
+	public static class Timer<K, N> implements Comparable<Timer<K, N>> {
+		private final long timestamp;
+		private final K key;
+		private final N namespace;
+
+		public Timer(long timestamp, K key, N namespace) {
+			this.timestamp = timestamp;
+			this.key = key;
+			this.namespace = namespace;
+		}
+
+		public long getTimestamp() {
+			return timestamp;
+		}
+
+		public K getKey() {
+			return key;
+		}
+
+		public N getNamespace() {
+			return namespace;
+		}
+
+		@Override
+		public int compareTo(Timer<K, N> o) {
+			return Long.compare(this.timestamp, o.timestamp);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()){
+				return false;
+			}
+
+			Timer<?, ?> timer = (Timer<?, ?>) o;
+
+			return timestamp == timer.timestamp
+					&& key.equals(timer.key)
+					&& namespace.equals(timer.namespace);
+
+		}
+
+		@Override
+		public int hashCode() {
+			int result = (int) (timestamp ^ (timestamp >>> 32));
+			result = 31 * result + key.hashCode();
+			result = 31 * result + namespace.hashCode();
+			return result;
+		}
+
+		@Override
+		public String toString() {
+			return "Timer{" +
+					"timestamp=" + timestamp +
+					", key=" + key +
+					", namespace=" + namespace +
+					'}';
+		}
+	}
+
+	public int numProcessingTimeTimers() {
+		return processingTimeTimers.size();
+	}
+
+	public int numEventTimeTimers() {
+		return watermarkTimers.size();
+	}
+
+	public int numProcessingTimeTimers(N namespace) {
+		int count = 0;
+		for (Timer<K, N> timer : processingTimeTimers) {
+			if (timer.getNamespace().equals(namespace)) {
+				count++;
+			}
+		}
+
+		return count;
+	}
+
+	public int numEventTimeTimers(N namespace) {
+		int count = 0;
+		for (Timer<K, N> timer : watermarkTimers) {
+			if (timer.getNamespace().equals(namespace)) {
+				count++;
+			}
+		}
+
+		return count;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 2903758..4d24b82 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -52,7 +52,7 @@ public class TestProcessingTimeServiceTest {
 
 		testHarness.invoke();
 
-		assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 0);
+		assertEquals(Long.MIN_VALUE, testHarness.getProcessingTimeService().getCurrentProcessingTime());
 
 		tp.setCurrentTime(11);
 		assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 11);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
new file mode 100644
index 0000000..16e353b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+	/**
+	 * Verify that state of separate windows does not leak into other windows.
+	 */
+	@Test
+	public void testWindowSeparationAndFiring() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		// shouldn't have any timers
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		// right now, CountTrigger will clear it's state in onElement when firing
+		// ideally, this should be moved to onFire()
+		assertEquals(1, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		// now all state should be gone
+		assertEquals(0, testHarness.numStateEntries());
+	}
+
+	/**
+	 * Verify that clear() does not leak across windows.
+	 */
+	@Test
+	public void testClear() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		// shouldn't have any timers
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+		assertEquals(1, testHarness.numStateEntries());
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+	}
+
+	@Test
+	public void testMergingWindows() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6)));
+
+		// shouldn't have any timers
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+
+		assertEquals(3, testHarness.numStateEntries());
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 4)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 4)));
+
+		assertEquals(1, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 4)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6)));
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6)));
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6)));
+
+		assertEquals(0, testHarness.numStateEntries());
+	}
+
+	@Test
+	public void testMergeSubsumingWindow() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6)));
+
+		// shouldn't have any timers
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 8), Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(4, 6)));
+
+		assertEquals(1, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+		assertEquals(0, testHarness.numStateEntries(new TimeWindow(4, 6)));
+		assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 8)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 8)));
+
+		assertEquals(0, testHarness.numStateEntries());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
new file mode 100644
index 0000000..a46572b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+		final int SESSION_GAP = 5000;
+
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(SESSION_GAP));
+
+		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP)));
+		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
+		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+	}
+
+	@Test
+	public void testMergeSinglePointWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeSingleWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeConsecutiveWindows() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(0, 1),
+						new TimeWindow(1, 2),
+						new TimeWindow(2, 3),
+						new TimeWindow(4, 5),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
+				eq(new TimeWindow(0, 3)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
+				eq(new TimeWindow(4, 6)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeCoveringWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(1, 1),
+						new TimeWindow(0, 2),
+						new TimeWindow(4, 7),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
+				eq(new TimeWindow(0, 2)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
+				eq(new TimeWindow(4, 7)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testTimeUnits() {
+		// sanity check with one other time unit
+
+		final int SESSION_GAP = 5000;
+
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(SESSION_GAP / 1000));
+
+		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP)));
+		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
+		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		try {
+			EventTimeSessionWindows.withGap(Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < size"));
+		}
+
+		try {
+			EventTimeSessionWindows.withGap(Time.seconds(0));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < size"));
+		}
+
+	}
+
+	@Test
+	public void testProperties() {
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(5));
+
+		assertTrue(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
new file mode 100644
index 0000000..2d93ac0
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+	/**
+	 * Verify that state of separate windows does not leak into other windows.
+	 */
+	@Test
+	public void testWindowSeparationAndFiring() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+		// inject several elements
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+	}
+
+	/**
+	 * Verify that late elements trigger immediately and also that we don't set a timer
+	 * for those.
+	 */
+	@Test
+	public void testLateElementTriggersImmediately() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+		testHarness.advanceWatermark(2);
+
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+	}
+
+
+	/**
+	 * Verify that clear() does not leak across windows.
+	 */
+	@Test
+	public void testClear() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+	}
+
+	@Test
+	public void testMergingWindows() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+		assertTrue(EventTimeTrigger.create().canMerge());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 4)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(0, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
new file mode 100644
index 0000000..37fad7e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link GlobalWindows}
+ */
+public class GlobalWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		GlobalWindows assigner = GlobalWindows.create();
+
+		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
+		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
+		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
+	}
+
+	@Test
+	public void testProperties() {
+		GlobalWindows assigner = GlobalWindows.create();
+
+		assertFalse(assigner.isEventTime());
+		assertEquals(new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(GlobalWindows.NeverTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index 46169a8..aa9cb91 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -290,6 +290,31 @@ public class MergingWindowSetTest {
 	}
 
 	/**
+	 * Test adding a new window that is identical to an existing window. This should not cause
+	 * a merge.
+	 */
+	@Test
+	public void testAddingIdenticalWindows() throws Exception {
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState);
+
+		TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));
+	}
+
+
+	/**
 	 * Test merging of a large new window that covers multiple existing windows.
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
new file mode 100644
index 0000000..461b5fc
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link ProcessingTimeSessionWindows}
+ */
+public class ProcessingTimeSessionWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4999, 9999)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
+	}
+
+	@Test
+	public void testMergeSinglePointWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeSingleWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeConsecutiveWindows() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(0, 1),
+						new TimeWindow(1, 2),
+						new TimeWindow(2, 3),
+						new TimeWindow(4, 5),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
+				eq(new TimeWindow(0, 3)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
+				eq(new TimeWindow(4, 6)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeCoveringWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+
+		ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(1, 1),
+						new TimeWindow(0, 2),
+						new TimeWindow(4, 7),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
+				eq(new TimeWindow(0, 2)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
+				eq(new TimeWindow(4, 7)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testTimeUnits() {
+		// sanity check with one other time unit
+
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.seconds(5));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4999, 9999)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		try {
+			ProcessingTimeSessionWindows.withGap(Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < size"));
+		}
+
+		try {
+			ProcessingTimeSessionWindows.withGap(Time.seconds(0));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < size"));
+		}
+
+	}
+
+	@Test
+	public void testProperties() {
+		ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.seconds(5));
+
+		assertFalse(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
new file mode 100644
index 0000000..a0c2347
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ProcessingTimeTrigger}.
+ */
+public class ProcessingTimeTriggerTest {
+
+	/**
+	 * Verify that state of separate windows does not leak into other windows.
+	 */
+	@Test
+	public void testWindowSeparationAndFiring() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer());
+
+		// inject several elements
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.advanceProcessingTime(2, new TimeWindow(0, 2)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.advanceProcessingTime(4, new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+	}
+
+	/**
+	 * Verify that clear() does not leak across windows.
+	 */
+	@Test
+	public void testClear() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+	}
+
+	@Test
+	public void testMergingWindows() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer());
+
+		assertTrue(ProcessingTimeTrigger.create().canMerge());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 4)));
+
+		assertEquals(TriggerResult.FIRE, testHarness.advanceProcessingTime(4, new TimeWindow(0, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
new file mode 100644
index 0000000..4302d4d
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyOnMergeContext;
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTimeWindow;
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+	/**
+	 * Check if {@link PurgingTrigger} implements all methods of {@link Trigger}, as a sanity
+	 * check.
+	 */
+	@Test
+	public void testAllMethodsImplemented() throws NoSuchMethodException {
+		for (Method triggerMethod : Trigger.class.getDeclaredMethods()) {
+
+			// try retrieving the method, this will throw an exception if we can't find it
+			PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(),triggerMethod.getParameterTypes());
+		}
+	}
+
+	@Test
+	public void testForwarding() throws Exception {
+		Trigger<Object, TimeWindow> mockTrigger = mock(Trigger.class);
+
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(PurgingTrigger.of(mockTrigger), new TimeWindow.Serializer());
+
+		when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+		when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+		when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+		when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		assertEquals(TriggerResult.PURGE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+		doAnswer(new Answer<TriggerResult>() {
+			@Override
+			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+				Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+
+				// register some timers that we can step through to call onEventTime several
+				// times in a row
+				context.registerEventTimeTimer(1);
+				context.registerEventTimeTimer(2);
+				context.registerEventTimeTimer(3);
+				context.registerEventTimeTimer(4);
+				return TriggerResult.CONTINUE;
+			}
+		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+		// set up our timers
+		testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2));
+
+		assertEquals(4, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		assertEquals(TriggerResult.CONTINUE, testHarness.advanceWatermark(1, new TimeWindow(0, 2)));
+
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceWatermark(3, new TimeWindow(0, 2)));
+
+		when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		assertEquals(TriggerResult.PURGE, testHarness.advanceWatermark(4, new TimeWindow(0, 2)));
+
+		doAnswer(new Answer<TriggerResult>() {
+			@Override
+			public TriggerResult answer(InvocationOnMock invocation) throws Exception {
+				Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
+
+				// register some timers that we can step through to call onEventTime several
+				// times in a row
+				context.registerProcessingTimeTimer(1);
+				context.registerProcessingTimeTimer(2);
+				context.registerProcessingTimeTimer(3);
+				context.registerProcessingTimeTimer(4);
+				return TriggerResult.CONTINUE;
+			}
+		}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
+
+		// set up our timers
+		testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2));
+
+		assertEquals(4, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+		assertEquals(TriggerResult.CONTINUE, testHarness.advanceProcessingTime(1, new TimeWindow(0, 2)));
+
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+		assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceProcessingTime(2, new TimeWindow(0, 2)));
+
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+		assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceProcessingTime(3, new TimeWindow(0, 2)));
+
+		when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+		assertEquals(TriggerResult.PURGE, testHarness.advanceProcessingTime(4, new TimeWindow(0, 2)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 2), Collections.singletonList(new TimeWindow(0, 1)));
+		verify(mockTrigger, times(1)).onMerge(anyTimeWindow(), anyOnMergeContext());
+
+		testHarness.clearTriggerState(new TimeWindow(0, 2));
+		verify(mockTrigger, times(1)).clear(eq(new TimeWindow(0, 2)), anyTriggerContext());
+	}
+
+}