You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/24 09:52:50 UTC

[3/9] flink git commit: [FLINK-4552] Refactor WindowOperator/Trigger Tests

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
new file mode 100644
index 0000000..050178b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Simple {@link TriggerTestHarness} that accepts integers and takes the value as the timestamp for
+ * the {@link StreamRecord}.
+ */
+public class SimpleTriggerTestHarness<W extends Window> extends TriggerTestHarness<Integer, W> {
+
+	public SimpleTriggerTestHarness(
+			Trigger<Integer, W> trigger,
+			TypeSerializer<W> windowSerializer) throws Exception {
+		super(trigger, windowSerializer);
+	}
+
+	public TriggerResult processElement(Integer element, W window) throws Exception {
+		return super.processElement(new StreamRecord<>(element, element), window);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
new file mode 100644
index 0000000..4599d19
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link SlidingEventTimeWindows}
+ */
+public class SlidingEventTimeWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingEventTimeWindows assigner =
+				SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000));
+
+		assertThat(assigner.assignWindows("String", 0L, mockContext), containsInAnyOrder(
+				timeWindow(-4000, 1000),
+				timeWindow(-3000, 2000),
+				timeWindow(-2000, 3000),
+				timeWindow(-1000, 4000),
+				timeWindow(0, 5000)));
+
+		assertThat(assigner.assignWindows("String", 4999L, mockContext), containsInAnyOrder(
+				timeWindow(0, 5000),
+				timeWindow(1000, 6000),
+				timeWindow(2000, 7000),
+				timeWindow(3000, 8000),
+				timeWindow(4000, 9000)));
+
+		assertThat(assigner.assignWindows("String", 5000L, mockContext), containsInAnyOrder(
+				timeWindow(1000, 6000),
+				timeWindow(2000, 7000),
+				timeWindow(3000, 8000),
+				timeWindow(4000, 9000),
+				timeWindow(5000, 10000)));
+	}
+
+	@Test
+	public void testWindowAssignmentWithOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingEventTimeWindows assigner =
+				SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(100));
+
+		assertThat(assigner.assignWindows("String", 100L, mockContext), containsInAnyOrder(
+				timeWindow(-3900, 1100),
+				timeWindow(-2900, 2100),
+				timeWindow(-1900, 3100),
+				timeWindow(-900, 4100),
+				timeWindow(100, 5100)));
+
+		assertThat(assigner.assignWindows("String", 5099L, mockContext), containsInAnyOrder(
+				timeWindow(100, 5100),
+				timeWindow(1100, 6100),
+				timeWindow(2100, 7100),
+				timeWindow(3100, 8100),
+				timeWindow(4100, 9100)));
+
+		assertThat(assigner.assignWindows("String", 5100L, mockContext), containsInAnyOrder(
+				timeWindow(1100, 6100),
+				timeWindow(2100, 7100),
+				timeWindow(3100, 8100),
+				timeWindow(4100, 9100),
+				timeWindow(5100, 10100)));
+	}
+
+	@Test
+	public void testTimeUnits() {
+		// sanity check with one other time unit
+
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingEventTimeWindows assigner = SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1), Time.milliseconds(500));
+
+		assertThat(assigner.assignWindows("String", 100L, mockContext), containsInAnyOrder(
+				timeWindow(-4500, 500),
+				timeWindow(-3500, 1500),
+				timeWindow(-2500, 2500),
+				timeWindow(-1500, 3500),
+				timeWindow(-500, 4500)));
+
+		assertThat(assigner.assignWindows("String", 5499L, mockContext), containsInAnyOrder(
+				timeWindow(500, 5500),
+				timeWindow(1500, 6500),
+				timeWindow(2500, 7500),
+				timeWindow(3500, 8500),
+				timeWindow(4500, 9500)));
+
+		assertThat(assigner.assignWindows("String", 5100L, mockContext), containsInAnyOrder(
+				timeWindow(500, 5500),
+				timeWindow(1500, 6500),
+				timeWindow(2500, 7500),
+				timeWindow(3500, 8500),
+				timeWindow(4500, 9500)));
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		try {
+			SlidingEventTimeWindows.of(Time.seconds(-2), Time.seconds(1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+		}
+
+		try {
+			SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+		}
+
+
+		try {
+			SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+		}
+	}
+
+	@Test
+	public void testProperties() {
+		SlidingEventTimeWindows assigner = SlidingEventTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+		assertTrue(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
new file mode 100644
index 0000000..20a9924
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link SlidingProcessingTimeWindows}
+ */
+public class SlidingProcessingTimeWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingProcessingTimeWindows assigner =
+				SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(-4000, 1000),
+				timeWindow(-3000, 2000),
+				timeWindow(-2000, 3000),
+				timeWindow(-1000, 4000),
+				timeWindow(0, 5000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(0, 5000),
+				timeWindow(1000, 6000),
+				timeWindow(2000, 7000),
+				timeWindow(3000, 8000),
+				timeWindow(4000, 9000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(1000, 6000),
+				timeWindow(2000, 7000),
+				timeWindow(3000, 8000),
+				timeWindow(4000, 9000),
+				timeWindow(5000, 10000)));
+	}
+
+	@Test
+	public void testWindowAssignmentWithOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingProcessingTimeWindows assigner =
+				SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(100));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(-3900, 1100),
+				timeWindow(-2900, 2100),
+				timeWindow(-1900, 3100),
+				timeWindow(-900, 4100),
+				timeWindow(100, 5100)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(100, 5100),
+				timeWindow(1100, 6100),
+				timeWindow(2100, 7100),
+				timeWindow(3100, 8100),
+				timeWindow(4100, 9100)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(1100, 6100),
+				timeWindow(2100, 7100),
+				timeWindow(3100, 8100),
+				timeWindow(4100, 9100),
+				timeWindow(5100, 10100)));
+	}
+
+	@Test
+	public void testTimeUnits() {
+		// sanity check with one other time unit
+
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingProcessingTimeWindows assigner = SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1), Time.milliseconds(500));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(-4500, 500),
+				timeWindow(-3500, 1500),
+				timeWindow(-2500, 2500),
+				timeWindow(-1500, 3500),
+				timeWindow(-500, 4500)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5499L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(500, 5500),
+				timeWindow(1500, 6500),
+				timeWindow(2500, 7500),
+				timeWindow(3500, 8500),
+				timeWindow(4500, 9500)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+				timeWindow(500, 5500),
+				timeWindow(1500, 6500),
+				timeWindow(2500, 7500),
+				timeWindow(3500, 8500),
+				timeWindow(4500, 9500)));
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		try {
+			SlidingProcessingTimeWindows.of(Time.seconds(-2), Time.seconds(1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+		}
+
+		try {
+			SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+		}
+
+
+		try {
+			SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+		}
+	}
+
+	@Test
+	public void testProperties() {
+		SlidingProcessingTimeWindows assigner = SlidingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+		assertFalse(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
new file mode 100644
index 0000000..bb07996
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matchers that are useful for working with {@link StreamRecord StreamRecords}. This ...
+ */
+public class StreamRecordMatchers {
+
+  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+          T value) {
+
+    return isStreamRecord(Matchers.equalTo(value));
+  }
+
+  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+      T value,
+      long timestamp) {
+
+    return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+  }
+
+  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+          Matcher<? super T> valueMatcher) {
+    return new StreamRecordMatcher<>(valueMatcher, Matchers.anything());
+  }
+
+  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+      Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+    return new StreamRecordMatcher<>(valueMatcher, timestampMatcher);
+  }
+
+  public static Matcher<TimeWindow> timeWindow(long start, long end) {
+    return Matchers.equalTo(new TimeWindow(start, end));
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @SafeVarargs
+  public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) {
+    return (Matcher) Matchers.containsInAnyOrder(windows);
+  }
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          T value) {
+    return isWindowedValue(Matchers.equalTo(value));
+  }
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          T value,
+          long timestamp) {
+    return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+  }
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          T value,
+          long timestamp,
+          W window) {
+    return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
+  }
+
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          Matcher<? super T> valueMatcher, long timestamp) {
+    return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything());
+  }
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          Matcher<? super T> valueMatcher, long timestamp, W window) {
+    return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window));
+  }
+
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          Matcher<? super T> valueMatcher) {
+    return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+  }
+
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
+  }
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) {
+    return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher);
+  }
+
+  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+          Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) {
+    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher);
+  }
+
+
+  private StreamRecordMatchers() {}
+
+  private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> {
+
+    private Matcher<? super T> valueMatcher;
+    private Matcher<? super Long> timestampMatcher;
+
+    private StreamRecordMatcher(
+        Matcher<? super T> valueMatcher,
+        Matcher<? super Long> timestampMatcher) {
+      this.valueMatcher = valueMatcher;
+      this.timestampMatcher = timestampMatcher;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description
+          .appendText("a StreamRecordValue(").appendValue(valueMatcher)
+          .appendText(", ").appendValue(timestampMatcher)
+          .appendText(")");
+    }
+
+    @Override
+    protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) {
+      return valueMatcher.matches(streamRecord.getValue())
+              && timestampMatcher.matches(streamRecord.getTimestamp());
+    }
+  }
+
+  private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> {
+
+    private Matcher<? super T> valueMatcher;
+    private Matcher<? super Long> timestampMatcher;
+    private Matcher<? super W> windowMatcher;
+
+
+    private WindowedValueMatcher(
+            Matcher<? super T> valueMatcher,
+            Matcher<? super Long> timestampMatcher,
+            Matcher<? super W> windowMatcher) {
+      this.valueMatcher = valueMatcher;
+      this.timestampMatcher = timestampMatcher;
+      this.windowMatcher = windowMatcher;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description
+              .appendText("a WindowedValue(").appendValue(valueMatcher)
+              .appendText(", ").appendValue(timestampMatcher)
+              .appendText(", ").appendValue(timestampMatcher)
+              .appendText(")");
+    }
+
+    @Override
+    protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) {
+      return valueMatcher.matches(streamRecord.getValue().value())
+              && timestampMatcher.matches(streamRecord.getTimestamp())
+              && windowMatcher.matches(streamRecord.getValue().window());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
new file mode 100644
index 0000000..b9923f2
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness<T, W extends Window> {
+
+	private static final Integer KEY = 1;
+
+	private final Trigger<T, W> trigger;
+	private final TypeSerializer<W> windowSerializer;
+
+	private final HeapKeyedStateBackend<Integer> stateBackend;
+	private final TestInternalTimerService<Integer, W> internalTimerService;
+
+	public TriggerTestHarness(
+			Trigger<T, W> trigger,
+			TypeSerializer<W> windowSerializer) throws Exception {
+		this.trigger = trigger;
+		this.windowSerializer = windowSerializer;
+
+		// we only ever use one key, other tests make sure that windows work across different
+		// keys
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		MemoryStateBackend backend = new MemoryStateBackend();
+
+		@SuppressWarnings("unchecked")
+		HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				1,
+				new KeyGroupRange(0, 0),
+				new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+		this.stateBackend = stateBackend;
+
+		this.stateBackend.setCurrentKey(0);
+
+		this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
+			@Override
+			public void setCurrentKey(Object key) {
+				// ignore
+			}
+
+			@Override
+			public Object getCurrentKey() {
+				return KEY;
+			}
+		});
+	}
+
+	public int numProcessingTimeTimers() {
+		return internalTimerService.numProcessingTimeTimers();
+	}
+
+	public int numProcessingTimeTimers(W window) {
+		return internalTimerService.numProcessingTimeTimers(window);
+	}
+
+	public int numEventTimeTimers() {
+		return internalTimerService.numEventTimeTimers();
+	}
+
+	public int numEventTimeTimers(W window) {
+		return internalTimerService.numEventTimeTimers(window);
+	}
+
+	public int numStateEntries() {
+		return stateBackend.numStateEntries();
+	}
+
+	public int numStateEntries(W window) {
+		return stateBackend.numStateEntries(window);
+	}
+
+	/**
+	 * Injects one element into the trigger for the given window and returns the result of
+	 * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+	 */
+	public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
+		TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+				KEY,
+				window,
+				internalTimerService,
+				stateBackend,
+				windowSerializer);
+		return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
+	}
+
+	/**
+	 * Advanced processing time and checks whether we have exactly one firing for the given
+	 * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}
+	 * is returned for that firing.
+	 */
+	public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
+		Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
+
+		if (firings.size() != 1) {
+			throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+		}
+
+		Tuple2<W, TriggerResult> firing = firings.iterator().next();
+
+		if (!firing.f0.equals(window)) {
+			throw new IllegalStateException("Trigger fired for another window.");
+		}
+
+		return firing.f1;
+	}
+
+	/**
+	 * Advanced the watermark and checks whether we have exactly one firing for the given
+	 * window. The result of {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}
+	 * is returned for that firing.
+	 */
+	public TriggerResult advanceWatermark(long time, W window) throws Exception {
+		Collection<Tuple2<W, TriggerResult>> firings = advanceWatermark(time);
+
+		if (firings.size() != 1) {
+			throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+		}
+
+		Tuple2<W, TriggerResult> firing = firings.iterator().next();
+
+		if (!firing.f0.equals(window)) {
+			throw new IllegalStateException("Trigger fired for another window.");
+		}
+
+		return firing.f1;
+	}
+
+	/**
+	 * Advanced processing time and processes any timers that fire because of this. The
+	 * window and {@link TriggerResult} for each firing are returned.
+	 */
+	public Collection<Tuple2<W, TriggerResult>> advanceProcessingTime(long time) throws Exception {
+		Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+				internalTimerService.advanceProcessingTime(time);
+
+		Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>();
+
+		for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+			TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+					KEY,
+					timer.getNamespace(),
+					internalTimerService,
+					stateBackend,
+					windowSerializer);
+
+			TriggerResult triggerResult =
+					trigger.onProcessingTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
+
+			result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+		}
+
+		return result;
+	}
+
+	/**
+	 * Advanced the watermark and processes any timers that fire because of this. The
+	 * window and {@link TriggerResult} for each firing are returned.
+	 */
+	public Collection<Tuple2<W, TriggerResult>> advanceWatermark(long time) throws Exception {
+		Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+				internalTimerService.advanceWatermark(time);
+
+		Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>();
+
+		for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+			TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+					KEY,
+					timer.getNamespace(),
+					internalTimerService,
+					stateBackend,
+					windowSerializer);
+
+			TriggerResult triggerResult =
+					trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
+
+			result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+		}
+
+		return result;
+	}
+
+	/**
+	 * Calls {@link Trigger#onMerge(Window, Trigger.OnMergeContext)} with the given
+	 * parameters. This also calls {@link Trigger#clear(Window, Trigger.TriggerContext)} on the
+	 * merged windows as does {@link WindowOperator}.
+	 */
+	public void mergeWindows(W targetWindow, Collection<W> mergedWindows) throws Exception {
+		TestOnMergeContext<Integer, W> onMergeContext = new TestOnMergeContext<>(
+				KEY,
+				targetWindow,
+				mergedWindows,
+				internalTimerService,
+				stateBackend,
+				windowSerializer);
+		trigger.onMerge(targetWindow, onMergeContext);
+
+		for (W mergedWindow : mergedWindows) {
+			clearTriggerState(mergedWindow);
+		}
+	}
+
+	/**
+	 * Calls {@link Trigger#clear(Window, Trigger.TriggerContext)} for the given window.
+	 */
+	public void clearTriggerState(W window) throws Exception {
+		TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+				KEY,
+				window,
+				internalTimerService,
+				stateBackend,
+				windowSerializer);
+		trigger.clear(window, triggerContext);
+	}
+
+	private static class TestTriggerContext<K, W extends Window> implements Trigger.TriggerContext {
+
+		protected final InternalTimerService<W> timerService;
+		protected final KeyedStateBackend<Integer> stateBackend;
+		protected final K key;
+		protected final W window;
+		protected final TypeSerializer<W> windowSerializer;
+
+		TestTriggerContext(
+				K key,
+				W window,
+				InternalTimerService<W> timerService,
+				KeyedStateBackend<Integer> stateBackend,
+				TypeSerializer<W> windowSerializer) {
+			this.key = key;
+			this.window = window;
+			this.timerService = timerService;
+			this.stateBackend = stateBackend;
+			this.windowSerializer = windowSerializer;
+		}
+
+		@Override
+		public long getCurrentProcessingTime() {
+			return timerService.currentProcessingTime();
+		}
+
+		@Override
+		public MetricGroup getMetricGroup() {
+			return null;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return timerService.currentWatermark();
+		}
+
+		@Override
+		public void registerProcessingTimeTimer(long time) {
+			timerService.registerProcessingTimeTimer(window, time);
+		}
+
+		@Override
+		public void registerEventTimeTimer(long time) {
+			timerService.registerEventTimeTimer(window, time);
+		}
+
+		@Override
+		public void deleteProcessingTimeTimer(long time) {
+			timerService.deleteProcessingTimeTimer(window, time);
+		}
+
+		@Override
+		public void deleteEventTimeTimer(long time) {
+			timerService.deleteEventTimeTimer(window, time);
+		}
+
+		@Override
+		public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
+			try {
+				return stateBackend.getPartitionedState(window, windowSerializer, stateDescriptor);
+			} catch (Exception e) {
+				throw new RuntimeException("Error getting state", e);
+			}
+		}
+
+		@Override
+		public <S extends Serializable> ValueState<S> getKeyValueState(
+				String name, Class<S> stateType, S defaultState) {
+			return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState));
+		}
+
+		@Override
+		public <S extends Serializable> ValueState<S> getKeyValueState(
+				String name, TypeInformation<S> stateType, S defaultState) {
+			return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState));
+		}
+	}
+
+	private static class TestOnMergeContext<K, W extends Window> extends TestTriggerContext<K, W> implements Trigger.OnMergeContext {
+
+		private final Collection<W> mergedWindows;
+
+		public TestOnMergeContext(
+				K key,
+				W targetWindow,
+				Collection<W> mergedWindows,
+				InternalTimerService<W> timerService,
+				KeyedStateBackend<Integer> stateBackend,
+				TypeSerializer<W> windowSerializer) {
+			super(key, targetWindow, timerService, stateBackend, windowSerializer);
+
+			this.mergedWindows = mergedWindows;
+		}
+
+		@Override
+		public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
+			try {
+				S rawState = stateBackend.getOrCreateKeyedState(windowSerializer, stateDescriptor);
+
+				if (rawState instanceof InternalMergingState) {
+					@SuppressWarnings("unchecked")
+					InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState;
+					mergingState.mergeNamespaces(window, mergedWindows);
+				}
+				else {
+					throw new IllegalArgumentException(
+							"The given state descriptor does not refer to a mergeable state (MergingState)");
+				}
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Error while merging state.", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
new file mode 100644
index 0000000..2373a86
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link TumblingEventTimeWindows}
+ */
+public class TumblingEventTimeWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000));
+
+		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 5000)));
+		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(0, 5000)));
+		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 10000)));
+	}
+
+	@Test
+	public void testWindowAssignmentWithOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));
+
+		assertThat(assigner.assignWindows("String", 100L, mockContext), contains(timeWindow(100, 5100)));
+		assertThat(assigner.assignWindows("String", 5099L, mockContext), contains(timeWindow(100, 5100)));
+		assertThat(assigner.assignWindows("String", 5100L, mockContext), contains(timeWindow(5100, 10100)));
+	}
+
+	@Test
+	public void testTimeUnits() {
+		// sanity check with one other time unit
+
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(1));
+
+		assertThat(assigner.assignWindows("String", 1000L, mockContext), contains(timeWindow(1000, 6000)));
+		assertThat(assigner.assignWindows("String", 5999L, mockContext), contains(timeWindow(1000, 6000)));
+		assertThat(assigner.assignWindows("String", 6000L, mockContext), contains(timeWindow(6000, 11000)));
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		try {
+			TumblingEventTimeWindows.of(Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < size"));
+		}
+
+		try {
+			TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(20));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < size"));
+		}
+
+		try {
+			TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < size"));
+		}
+	}
+
+	@Test
+	public void testProperties() {
+		TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+		assertTrue(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
new file mode 100644
index 0000000..348b6fa
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link TumblingProcessingTimeWindows}
+ */
+public class TumblingProcessingTimeWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
+	}
+
+	@Test
+	public void testWindowAssignmentWithOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
+	}
+
+	@Test
+	public void testTimeUnits() {
+		// sanity check with one other time unit
+
+		WindowAssigner.WindowAssignerContext mockContext =
+				mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		try {
+			TumblingProcessingTimeWindows.of(Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < size"));
+		}
+
+		try {
+			TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(20));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < size"));
+		}
+
+		try {
+			TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 <= offset < size"));
+		}
+	}
+
+	@Test
+	public void testProperties() {
+		TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+		assertFalse(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
+	}
+}