You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/23 14:01:43 UTC
[03/10] flink git commit: [FLINK-4552] Refactor
WindowOperator/Trigger Tests
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
new file mode 100644
index 0000000..050178b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Simple {@link TriggerTestHarness} that accepts integers and takes the value as the timestamp for
+ * the {@link StreamRecord}.
+ */
+public class SimpleTriggerTestHarness<W extends Window> extends TriggerTestHarness<Integer, W> {
+
+ public SimpleTriggerTestHarness(
+ Trigger<Integer, W> trigger,
+ TypeSerializer<W> windowSerializer) throws Exception {
+ super(trigger, windowSerializer);
+ }
+
+ public TriggerResult processElement(Integer element, W window) throws Exception {
+ return super.processElement(new StreamRecord<>(element, element), window);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
new file mode 100644
index 0000000..4599d19
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link SlidingEventTimeWindows}
+ */
+public class SlidingEventTimeWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingEventTimeWindows assigner =
+ SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000));
+
+ assertThat(assigner.assignWindows("String", 0L, mockContext), containsInAnyOrder(
+ timeWindow(-4000, 1000),
+ timeWindow(-3000, 2000),
+ timeWindow(-2000, 3000),
+ timeWindow(-1000, 4000),
+ timeWindow(0, 5000)));
+
+ assertThat(assigner.assignWindows("String", 4999L, mockContext), containsInAnyOrder(
+ timeWindow(0, 5000),
+ timeWindow(1000, 6000),
+ timeWindow(2000, 7000),
+ timeWindow(3000, 8000),
+ timeWindow(4000, 9000)));
+
+ assertThat(assigner.assignWindows("String", 5000L, mockContext), containsInAnyOrder(
+ timeWindow(1000, 6000),
+ timeWindow(2000, 7000),
+ timeWindow(3000, 8000),
+ timeWindow(4000, 9000),
+ timeWindow(5000, 10000)));
+ }
+
+ @Test
+ public void testWindowAssignmentWithOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingEventTimeWindows assigner =
+ SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(100));
+
+ assertThat(assigner.assignWindows("String", 100L, mockContext), containsInAnyOrder(
+ timeWindow(-3900, 1100),
+ timeWindow(-2900, 2100),
+ timeWindow(-1900, 3100),
+ timeWindow(-900, 4100),
+ timeWindow(100, 5100)));
+
+ assertThat(assigner.assignWindows("String", 5099L, mockContext), containsInAnyOrder(
+ timeWindow(100, 5100),
+ timeWindow(1100, 6100),
+ timeWindow(2100, 7100),
+ timeWindow(3100, 8100),
+ timeWindow(4100, 9100)));
+
+ assertThat(assigner.assignWindows("String", 5100L, mockContext), containsInAnyOrder(
+ timeWindow(1100, 6100),
+ timeWindow(2100, 7100),
+ timeWindow(3100, 8100),
+ timeWindow(4100, 9100),
+ timeWindow(5100, 10100)));
+ }
+
+ @Test
+ public void testTimeUnits() {
+ // sanity check with one other time unit
+
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingEventTimeWindows assigner = SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1), Time.milliseconds(500));
+
+ assertThat(assigner.assignWindows("String", 100L, mockContext), containsInAnyOrder(
+ timeWindow(-4500, 500),
+ timeWindow(-3500, 1500),
+ timeWindow(-2500, 2500),
+ timeWindow(-1500, 3500),
+ timeWindow(-500, 4500)));
+
+ assertThat(assigner.assignWindows("String", 5499L, mockContext), containsInAnyOrder(
+ timeWindow(500, 5500),
+ timeWindow(1500, 6500),
+ timeWindow(2500, 7500),
+ timeWindow(3500, 8500),
+ timeWindow(4500, 9500)));
+
+ assertThat(assigner.assignWindows("String", 5100L, mockContext), containsInAnyOrder(
+ timeWindow(500, 5500),
+ timeWindow(1500, 6500),
+ timeWindow(2500, 7500),
+ timeWindow(3500, 8500),
+ timeWindow(4500, 9500)));
+ }
+
+ @Test
+ public void testInvalidParameters() {
+ try {
+ SlidingEventTimeWindows.of(Time.seconds(-2), Time.seconds(1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ }
+
+ try {
+ SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ }
+
+
+ try {
+ SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ }
+ }
+
+ @Test
+ public void testProperties() {
+ SlidingEventTimeWindows assigner = SlidingEventTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+ assertTrue(assigner.isEventTime());
+ assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+ assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
new file mode 100644
index 0000000..20a9924
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link SlidingProcessingTimeWindows}
+ */
+public class SlidingProcessingTimeWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingProcessingTimeWindows assigner =
+ SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(-4000, 1000),
+ timeWindow(-3000, 2000),
+ timeWindow(-2000, 3000),
+ timeWindow(-1000, 4000),
+ timeWindow(0, 5000)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(0, 5000),
+ timeWindow(1000, 6000),
+ timeWindow(2000, 7000),
+ timeWindow(3000, 8000),
+ timeWindow(4000, 9000)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(1000, 6000),
+ timeWindow(2000, 7000),
+ timeWindow(3000, 8000),
+ timeWindow(4000, 9000),
+ timeWindow(5000, 10000)));
+ }
+
+ @Test
+ public void testWindowAssignmentWithOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingProcessingTimeWindows assigner =
+ SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(100));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(-3900, 1100),
+ timeWindow(-2900, 2100),
+ timeWindow(-1900, 3100),
+ timeWindow(-900, 4100),
+ timeWindow(100, 5100)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(100, 5100),
+ timeWindow(1100, 6100),
+ timeWindow(2100, 7100),
+ timeWindow(3100, 8100),
+ timeWindow(4100, 9100)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(1100, 6100),
+ timeWindow(2100, 7100),
+ timeWindow(3100, 8100),
+ timeWindow(4100, 9100),
+ timeWindow(5100, 10100)));
+ }
+
+ @Test
+ public void testTimeUnits() {
+ // sanity check with one other time unit
+
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingProcessingTimeWindows assigner = SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1), Time.milliseconds(500));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(-4500, 500),
+ timeWindow(-3500, 1500),
+ timeWindow(-2500, 2500),
+ timeWindow(-1500, 3500),
+ timeWindow(-500, 4500)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5499L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(500, 5500),
+ timeWindow(1500, 6500),
+ timeWindow(2500, 7500),
+ timeWindow(3500, 8500),
+ timeWindow(4500, 9500)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(500, 5500),
+ timeWindow(1500, 6500),
+ timeWindow(2500, 7500),
+ timeWindow(3500, 8500),
+ timeWindow(4500, 9500)));
+ }
+
+ @Test
+ public void testInvalidParameters() {
+ try {
+ SlidingProcessingTimeWindows.of(Time.seconds(-2), Time.seconds(1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ }
+
+ try {
+ SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ }
+
+
+ try {
+ SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ }
+ }
+
+ @Test
+ public void testProperties() {
+ SlidingProcessingTimeWindows assigner = SlidingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+ assertFalse(assigner.isEventTime());
+ assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+ assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
new file mode 100644
index 0000000..bb07996
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matchers that are useful for working with {@link StreamRecord StreamRecords}. This ...
+ */
+public class StreamRecordMatchers {
+
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ T value) {
+
+ return isStreamRecord(Matchers.equalTo(value));
+ }
+
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ T value,
+ long timestamp) {
+
+ return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+ }
+
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ Matcher<? super T> valueMatcher) {
+ return new StreamRecordMatcher<>(valueMatcher, Matchers.anything());
+ }
+
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+ return new StreamRecordMatcher<>(valueMatcher, timestampMatcher);
+ }
+
+ public static Matcher<TimeWindow> timeWindow(long start, long end) {
+ return Matchers.equalTo(new TimeWindow(start, end));
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @SafeVarargs
+ public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) {
+ return (Matcher) Matchers.containsInAnyOrder(windows);
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ T value) {
+ return isWindowedValue(Matchers.equalTo(value));
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ T value,
+ long timestamp) {
+ return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ T value,
+ long timestamp,
+ W window) {
+ return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
+ }
+
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, long timestamp) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything());
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, long timestamp, W window) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window));
+ }
+
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+ }
+
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher);
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher);
+ }
+
+
+ private StreamRecordMatchers() {}
+
+ private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> {
+
+ private Matcher<? super T> valueMatcher;
+ private Matcher<? super Long> timestampMatcher;
+
+ private StreamRecordMatcher(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Long> timestampMatcher) {
+ this.valueMatcher = valueMatcher;
+ this.timestampMatcher = timestampMatcher;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("a StreamRecordValue(").appendValue(valueMatcher)
+ .appendText(", ").appendValue(timestampMatcher)
+ .appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) {
+ return valueMatcher.matches(streamRecord.getValue())
+ && timestampMatcher.matches(streamRecord.getTimestamp());
+ }
+ }
+
+ private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> {
+
+ private Matcher<? super T> valueMatcher;
+ private Matcher<? super Long> timestampMatcher;
+ private Matcher<? super W> windowMatcher;
+
+
+ private WindowedValueMatcher(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Long> timestampMatcher,
+ Matcher<? super W> windowMatcher) {
+ this.valueMatcher = valueMatcher;
+ this.timestampMatcher = timestampMatcher;
+ this.windowMatcher = windowMatcher;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("a WindowedValue(").appendValue(valueMatcher)
+ .appendText(", ").appendValue(timestampMatcher)
+ .appendText(", ").appendValue(timestampMatcher)
+ .appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) {
+ return valueMatcher.matches(streamRecord.getValue().value())
+ && timestampMatcher.matches(streamRecord.getTimestamp())
+ && windowMatcher.matches(streamRecord.getValue().window());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
new file mode 100644
index 0000000..3fdc94f
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness<T, W extends Window> {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger<T, W> trigger;
+ private final TypeSerializer<W> windowSerializer;
+
+ private final HeapKeyedStateBackend<Integer> stateBackend;
+ private final TestInternalTimerService<Integer, W> internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger<T, W> trigger,
+ TypeSerializer<W> windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return KEY;
+ }
+ });
+ }
+
+ public int numProcessingTimeTimers() {
+ return internalTimerService.numProcessingTimeTimers();
+ }
+
+ public int numProcessingTimeTimers(W window) {
+ return internalTimerService.numProcessingTimeTimers(window);
+ }
+
+ public int numEventTimeTimers() {
+ return internalTimerService.numEventTimeTimers();
+ }
+
+ public int numEventTimeTimers(W window) {
+ return internalTimerService.numEventTimeTimers(window);
+ }
+
+ public int numStateEntries() {
+ return stateBackend.numStateEntries();
+ }
+
+ public int numStateEntries(W window) {
+ return stateBackend.numStateEntries(window);
+ }
+
+ /**
+ * Injects one element into the trigger for the given window and returns the result of
+ * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+ */
+ public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ window,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
+ }
+
+ /**
+ * Advanced processing time and checks whether we have exactly one firing for the given
+ * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
+ Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for another window.");
+ }
+
+ return firing.f1;
+ }
+
+ /**
+ * Advanced the watermark and checks whether we have exactly one firing for the given
+ * window. The result of {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceWatermark(long time, W window) throws Exception {
+ Collection<Tuple2<W, TriggerResult>> firings = advanceWatermark(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for another window.");
+ }
+
+ return firing.f1;
+ }
+
+ /**
+ * Advanced processing time and processes any timers that fire because of this. The
+ * window and {@link TriggerResult} for each firing are returned.
+ */
+ public Collection<Tuple2<W, TriggerResult>> advanceProcessingTime(long time) throws Exception {
+ Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+ internalTimerService.advanceProcessingTime(time);
+
+ Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>();
+
+ for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ timer.getNamespace(),
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+
+ TriggerResult triggerResult =
+ trigger.onProcessingTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
+
+ result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+ }
+
+ return result;
+ }
+
+ /**
+ * Advanced the watermark and processes any timers that fire because of this. The
+ * window and {@link TriggerResult} for each firing are returned.
+ */
+ public Collection<Tuple2<W, TriggerResult>> advanceWatermark(long time) throws Exception {
+ Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+ internalTimerService.advanceWatermark(time);
+
+ Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>();
+
+ for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ timer.getNamespace(),
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+
+ TriggerResult triggerResult =
+ trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
+
+ result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+ }
+
+ return result;
+ }
+
+ /**
+ * Calls {@link Trigger#onMerge(Window, Trigger.OnMergeContext)} with the given
+ * parameters. This also calls {@link Trigger#clear(Window, Trigger.TriggerContext)} on the
+ * merged windows as does {@link WindowOperator}.
+ */
+ public void mergeWindows(W targetWindow, Collection<W> mergedWindows) throws Exception {
+ TestOnMergeContext<Integer, W> onMergeContext = new TestOnMergeContext<>(
+ KEY,
+ targetWindow,
+ mergedWindows,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ trigger.onMerge(targetWindow, onMergeContext);
+
+ for (W mergedWindow : mergedWindows) {
+ clearTriggerState(mergedWindow);
+ }
+ }
+
+ /**
+ * Calls {@link Trigger#clear(Window, Trigger.TriggerContext)} for the given window.
+ */
+ public void clearTriggerState(W window) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ window,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ trigger.clear(window, triggerContext);
+ }
+
+ private static class TestTriggerContext<K, W extends Window> implements Trigger.TriggerContext {
+
+ protected final InternalTimerService<W> timerService;
+ protected final KeyedStateBackend<Integer> stateBackend;
+ protected final K key;
+ protected final W window;
+ protected final TypeSerializer<W> windowSerializer;
+
+ TestTriggerContext(
+ K key,
+ W window,
+ InternalTimerService<W> timerService,
+ KeyedStateBackend<Integer> stateBackend,
+ TypeSerializer<W> windowSerializer) {
+ this.key = key;
+ this.window = window;
+ this.timerService = timerService;
+ this.stateBackend = stateBackend;
+ this.windowSerializer = windowSerializer;
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return timerService.currentProcessingTime();
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return null;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return timerService.currentWatermark();
+ }
+
+ @Override
+ public void registerProcessingTimeTimer(long time) {
+ timerService.registerProcessingTimeTimer(window, time);
+ }
+
+ @Override
+ public void registerEventTimeTimer(long time) {
+ timerService.registerEventTimeTimer(window, time);
+ }
+
+ @Override
+ public void deleteProcessingTimeTimer(long time) {
+ timerService.deleteProcessingTimeTimer(window, time);
+ }
+
+ @Override
+ public void deleteEventTimeTimer(long time) {
+ timerService.deleteEventTimeTimer(window, time);
+ }
+
+ @Override
+ public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
+ try {
+ return stateBackend.getPartitionedState(window, windowSerializer, stateDescriptor);
+ } catch (Exception e) {
+ throw new RuntimeException("Error getting state", e);
+ }
+ }
+
+ @Override
+ public <S extends Serializable> ValueState<S> getKeyValueState(
+ String name, Class<S> stateType, S defaultState) {
+ return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState));
+ }
+
+ @Override
+ public <S extends Serializable> ValueState<S> getKeyValueState(
+ String name, TypeInformation<S> stateType, S defaultState) {
+ return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState));
+ }
+ }
+
+ private static class TestOnMergeContext<K, W extends Window> extends TestTriggerContext<K, W> implements Trigger.OnMergeContext {
+
+ private final Collection<W> mergedWindows;
+
+ public TestOnMergeContext(
+ K key,
+ W targetWindow,
+ Collection<W> mergedWindows,
+ InternalTimerService<W> timerService,
+ KeyedStateBackend<Integer> stateBackend,
+ TypeSerializer<W> windowSerializer) {
+ super(key, targetWindow, timerService, stateBackend, windowSerializer);
+
+ this.mergedWindows = mergedWindows;
+ }
+
+ @Override
+ public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
+ try {
+ stateBackend.mergePartitionedStates(window, mergedWindows, windowSerializer, stateDescriptor);
+ } catch (Exception e) {
+ throw new RuntimeException("Error merging state", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
new file mode 100644
index 0000000..2373a86
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link TumblingEventTimeWindows}
+ */
+public class TumblingEventTimeWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000));
+
+ assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 5000)));
+ assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(0, 5000)));
+ assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 10000)));
+ }
+
+ @Test
+ public void testWindowAssignmentWithOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));
+
+ assertThat(assigner.assignWindows("String", 100L, mockContext), contains(timeWindow(100, 5100)));
+ assertThat(assigner.assignWindows("String", 5099L, mockContext), contains(timeWindow(100, 5100)));
+ assertThat(assigner.assignWindows("String", 5100L, mockContext), contains(timeWindow(5100, 10100)));
+ }
+
+ @Test
+ public void testTimeUnits() {
+ // sanity check with one other time unit
+
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(1));
+
+ assertThat(assigner.assignWindows("String", 1000L, mockContext), contains(timeWindow(1000, 6000)));
+ assertThat(assigner.assignWindows("String", 5999L, mockContext), contains(timeWindow(1000, 6000)));
+ assertThat(assigner.assignWindows("String", 6000L, mockContext), contains(timeWindow(6000, 11000)));
+ }
+
+ @Test
+ public void testInvalidParameters() {
+ try {
+ TumblingEventTimeWindows.of(Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < size"));
+ }
+
+ try {
+ TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(20));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < size"));
+ }
+
+ try {
+ TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < size"));
+ }
+ }
+
+ @Test
+ public void testProperties() {
+ TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+ assertTrue(assigner.isEventTime());
+ assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+ assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb8586e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
new file mode 100644
index 0000000..348b6fa
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link TumblingProcessingTimeWindows}
+ */
+public class TumblingProcessingTimeWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
+ }
+
+ @Test
+ public void testWindowAssignmentWithOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
+ }
+
+ @Test
+ public void testTimeUnits() {
+ // sanity check with one other time unit
+
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
+ }
+
+ @Test
+ public void testInvalidParameters() {
+ try {
+ TumblingProcessingTimeWindows.of(Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < size"));
+ }
+
+ try {
+ TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(20));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < size"));
+ }
+
+ try {
+ TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("0 <= offset < size"));
+ }
+ }
+
+ @Test
+ public void testProperties() {
+ TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
+
+ assertFalse(assigner.isEventTime());
+ assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+ assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
+ }
+}