You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/22 21:50:22 UTC
[03/10] flink git commit: [FLINK-6603] [streaming] Enable checkstyle
on test sources
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
index bb07996..ada4d6f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@@ -30,150 +32,146 @@ import org.hamcrest.TypeSafeMatcher;
*/
public class StreamRecordMatchers {
- public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
- T value) {
-
- return isStreamRecord(Matchers.equalTo(value));
- }
-
- public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
- T value,
- long timestamp) {
-
- return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp));
- }
-
- public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
- Matcher<? super T> valueMatcher) {
- return new StreamRecordMatcher<>(valueMatcher, Matchers.anything());
- }
-
- public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
- Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
- return new StreamRecordMatcher<>(valueMatcher, timestampMatcher);
- }
-
- public static Matcher<TimeWindow> timeWindow(long start, long end) {
- return Matchers.equalTo(new TimeWindow(start, end));
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @SafeVarargs
- public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) {
- return (Matcher) Matchers.containsInAnyOrder(windows);
- }
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- T value) {
- return isWindowedValue(Matchers.equalTo(value));
- }
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- T value,
- long timestamp) {
- return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp));
- }
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- T value,
- long timestamp,
- W window) {
- return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
- }
-
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- Matcher<? super T> valueMatcher, long timestamp) {
- return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything());
- }
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- Matcher<? super T> valueMatcher, long timestamp, W window) {
- return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window));
- }
-
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- Matcher<? super T> valueMatcher) {
- return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
- }
-
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
- return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
- }
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) {
- return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher);
- }
-
- public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
- Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) {
- return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher);
- }
-
-
- private StreamRecordMatchers() {}
-
- private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> {
-
- private Matcher<? super T> valueMatcher;
- private Matcher<? super Long> timestampMatcher;
-
- private StreamRecordMatcher(
- Matcher<? super T> valueMatcher,
- Matcher<? super Long> timestampMatcher) {
- this.valueMatcher = valueMatcher;
- this.timestampMatcher = timestampMatcher;
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a StreamRecordValue(").appendValue(valueMatcher)
- .appendText(", ").appendValue(timestampMatcher)
- .appendText(")");
- }
-
- @Override
- protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) {
- return valueMatcher.matches(streamRecord.getValue())
- && timestampMatcher.matches(streamRecord.getTimestamp());
- }
- }
-
- private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> {
-
- private Matcher<? super T> valueMatcher;
- private Matcher<? super Long> timestampMatcher;
- private Matcher<? super W> windowMatcher;
-
-
- private WindowedValueMatcher(
- Matcher<? super T> valueMatcher,
- Matcher<? super Long> timestampMatcher,
- Matcher<? super W> windowMatcher) {
- this.valueMatcher = valueMatcher;
- this.timestampMatcher = timestampMatcher;
- this.windowMatcher = windowMatcher;
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a WindowedValue(").appendValue(valueMatcher)
- .appendText(", ").appendValue(timestampMatcher)
- .appendText(", ").appendValue(timestampMatcher)
- .appendText(")");
- }
-
- @Override
- protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) {
- return valueMatcher.matches(streamRecord.getValue().value())
- && timestampMatcher.matches(streamRecord.getTimestamp())
- && windowMatcher.matches(streamRecord.getValue().window());
- }
- }
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ T value) {
+
+ return isStreamRecord(Matchers.equalTo(value));
+ }
+
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ T value,
+ long timestamp) {
+
+ return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+ }
+
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ Matcher<? super T> valueMatcher) {
+ return new StreamRecordMatcher<>(valueMatcher, Matchers.anything());
+ }
+
+ public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+ Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+ return new StreamRecordMatcher<>(valueMatcher, timestampMatcher);
+ }
+
+ public static Matcher<TimeWindow> timeWindow(long start, long end) {
+ return Matchers.equalTo(new TimeWindow(start, end));
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @SafeVarargs
+ public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) {
+ return (Matcher) Matchers.containsInAnyOrder(windows);
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ T value) {
+ return isWindowedValue(Matchers.equalTo(value));
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ T value,
+ long timestamp) {
+ return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ T value,
+ long timestamp,
+ W window) {
+ return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, long timestamp) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything());
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, long timestamp, W window) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window));
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher);
+ }
+
+ public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+ Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher);
+ }
+
+ private StreamRecordMatchers() {
+ }
+
+ private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> {
+
+ private Matcher<? super T> valueMatcher;
+ private Matcher<? super Long> timestampMatcher;
+
+ private StreamRecordMatcher(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Long> timestampMatcher) {
+ this.valueMatcher = valueMatcher;
+ this.timestampMatcher = timestampMatcher;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("a StreamRecordValue(").appendValue(valueMatcher)
+ .appendText(", ").appendValue(timestampMatcher)
+ .appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) {
+ return valueMatcher.matches(streamRecord.getValue())
+ && timestampMatcher.matches(streamRecord.getTimestamp());
+ }
+ }
+
+ private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> {
+
+ private Matcher<? super T> valueMatcher;
+ private Matcher<? super Long> timestampMatcher;
+ private Matcher<? super W> windowMatcher;
+
+ private WindowedValueMatcher(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Long> timestampMatcher,
+ Matcher<? super W> windowMatcher) {
+ this.valueMatcher = valueMatcher;
+ this.timestampMatcher = timestampMatcher;
+ this.windowMatcher = windowMatcher;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("a WindowedValue(").appendValue(valueMatcher)
+ .appendText(", ").appendValue(timestampMatcher)
+ .appendText(", ").appendValue(timestampMatcher)
+ .appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) {
+ return valueMatcher.matches(streamRecord.getValue().value())
+ && timestampMatcher.matches(streamRecord.getTimestamp())
+ && windowMatcher.matches(streamRecord.getValue().window());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
index 341171d..f4e0f1d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -19,41 +19,45 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
+/**
+ * Tests for {@link TimeWindow}.
+ */
public class TimeWindowTest {
@Test
public void testGetWindowStartWithOffset() {
- //[0,7),[7,14),[14,21)...
+ // [0, 7), [7, 14), [14, 21)...
long offset = 0;
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), 0);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6, offset, 7), 0);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 7);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8, offset, 7), 7);
- //[-4,3),[3,10),[10,17)...
+ // [-4, 3), [3, 10), [10, 17)...
offset = 3;
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -4);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -4);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), 3);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9, offset, 7), 3);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10, offset, 7), 10);
- //[-2,5),[5,12),[12,19)...
+ // [-2, 5), [5, 12), [12, 19)...
offset = -2;
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), -2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4, offset, 7), -2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 5);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12, offset, 7), 12);
// for GMT+8:00
- offset = - TimeUnit.HOURS.toMillis(8);
+ offset = -TimeUnit.HOURS.toMillis(8);
long size = TimeUnit.DAYS.toMillis(1);
- Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450L, offset, size), 1470844800000L);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index df65ca2..dc0e21c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.FoldFunction;
@@ -28,8 +29,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
@@ -161,7 +162,7 @@ public class TimeWindowTranslationTest {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+ .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -185,7 +186,7 @@ public class TimeWindowTranslationTest {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+ .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.fold(new Tuple2<>("", 1), new DummyFolder());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -235,7 +236,7 @@ public class TimeWindowTranslationTest {
* These tests ensure that the fast aligned time windows operator is used if the
* conditions are right.
*
- * TODO: update once the fast aligned time windows operator is in
+ * <p>TODO: update once the fast aligned time windows operator is in
*/
@Ignore
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 4267444..5aa47e8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.JobID;
@@ -36,9 +37,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TestInternalTimerService;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -123,7 +124,7 @@ public class TriggerTestHarness<T, W extends Window> {
/**
* Injects one element into the trigger for the given window and returns the result of
- * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+ * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}.
*/
public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index 2373a86..9e4669f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
+package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,17 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
/**
- * Tests for {@link TumblingEventTimeWindows}
+ * Tests for {@link TumblingEventTimeWindows}.
*/
public class TumblingEventTimeWindowsTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
index 348b6fa..a611fc0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
+package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * Tests for {@link TumblingProcessingTimeWindows}
+ * Tests for {@link TumblingProcessingTimeWindows}.
*/
public class TumblingProcessingTimeWindowsTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 0d80605..8ceda45 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -15,38 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.runtime.operators.windowing;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -64,6 +38,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -73,6 +48,31 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Base for window operator tests that verify correct interaction with the other windowing
* components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner},
@@ -128,7 +128,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
return mockAssigner;
}
-
static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() throws Exception {
@SuppressWarnings("unchecked")
MergingWindowAssigner<T, TimeWindow> mockAssigner = mock(MergingWindowAssigner.class);
@@ -139,7 +138,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
return mockAssigner;
}
-
static WindowAssigner.WindowAssignerContext anyAssignerContext() {
return Mockito.any();
}
@@ -177,7 +175,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
return Mockito.any();
}
-
static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
doAnswer(new Answer<TriggerResult>() {
@Override
@@ -369,7 +366,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
-
@Test
public void testAssignerIsInvokedOncePerElement() throws Exception {
@@ -540,7 +536,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testEmittingFromWindowFunction(new ProcessingTimeAdaptor());
}
-
private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
@@ -1258,7 +1253,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testTimerFiring(new ProcessingTimeAdaptor());
}
-
private void testTimerFiring(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
@@ -1382,8 +1376,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback());
-
-
}
@Test
@@ -2392,7 +2384,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
context.windowState().getState(valueStateDescriptor).update("hello");
return null;
}
@@ -2401,7 +2393,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[1];
context.windowState().getState(valueStateDescriptor).clear();
return null;
}
@@ -2441,7 +2433,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
context.windowState().getState(valueStateDescriptor).update("hello");
return null;
}
@@ -2481,7 +2473,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
timeAdaptor.verifyCorrectTime(testHarness, context);
return null;
}
@@ -2490,7 +2482,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[1];
timeAdaptor.verifyCorrectTime(testHarness, context);
return null;
}
@@ -2520,7 +2512,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
long allowedLatenss,
InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception;
-
private interface TimeDomainAdaptor {
void setIsEventTime(WindowAssigner<?, ?> mockAssigner);
@@ -2535,9 +2526,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
int numTimersOtherDomain(AbstractStreamOperatorTestHarness testHarness);
- void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, final long timestamp) throws Exception;
+ void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception;
- void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, final long timestamp) throws Exception;
+ void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception;
void shouldContinueOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
index 9ec1923..904a8b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
import java.net.URL;
@@ -63,8 +65,7 @@ import static org.junit.Assert.fail;
* Tests for checking whether {@link WindowOperator} can restore from snapshots that were done
* using the Flink 1.1 {@link WindowOperator}.
*
- * <p>
- * This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1
+ * <p>This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1
* aligned processing-time windows operator.
*
* <p>For regenerating the binary snapshot file you have to run the commented out portion
@@ -85,7 +86,7 @@ public class WindowOperatorFrom11MigrationTest {
@SuppressWarnings("unchecked")
public void testRestoreSessionWindowsWithCountTriggerFromFlink11() throws Exception {
- final int SESSION_SIZE = 3;
+ final int sessionSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -93,7 +94,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
- EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -131,7 +132,6 @@ public class WindowOperatorFrom11MigrationTest {
testHarness.close();
*/
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -146,7 +146,6 @@ public class WindowOperatorFrom11MigrationTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
-
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
@@ -167,7 +166,7 @@ public class WindowOperatorFrom11MigrationTest {
@SuppressWarnings("unchecked")
public void testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws Exception {
- final int SESSION_SIZE = 3;
+ final int sessionSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -175,7 +174,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
- EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -244,7 +243,7 @@ public class WindowOperatorFrom11MigrationTest {
@Test
@SuppressWarnings("unchecked")
public void testRestoreReducingEventTimeWindowsFromFlink11() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -253,7 +252,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -334,7 +333,7 @@ public class WindowOperatorFrom11MigrationTest {
@Test
@SuppressWarnings("unchecked")
public void testRestoreApplyEventTimeWindowsFromFlink11() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -342,7 +341,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -423,7 +422,7 @@ public class WindowOperatorFrom11MigrationTest {
@Test
@SuppressWarnings("unchecked")
public void testRestoreReducingProcessingTimeWindowsFromFlink11() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -432,7 +431,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -500,7 +499,7 @@ public class WindowOperatorFrom11MigrationTest {
@Test
@SuppressWarnings("unchecked")
public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -508,7 +507,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -622,7 +621,7 @@ public class WindowOperatorFrom11MigrationTest {
*/
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -631,7 +630,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -725,7 +724,7 @@ public class WindowOperatorFrom11MigrationTest {
testHarness.close();
*/
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -734,7 +733,7 @@ public class WindowOperatorFrom11MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -774,7 +773,6 @@ public class WindowOperatorFrom11MigrationTest {
testHarness.close();
}
-
private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 1L;
@@ -832,7 +830,7 @@ public class WindowOperatorFrom11MigrationTest {
}
}
- public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
+ private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
@@ -841,7 +839,7 @@ public class WindowOperatorFrom11MigrationTest {
}
}
- public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
+ private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
private static final long serialVersionUID = 1L;
private boolean openCalled = false;
@@ -877,7 +875,7 @@ public class WindowOperatorFrom11MigrationTest {
}
- public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+ private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
index 0d3a6dc..6e9db1a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
@@ -15,13 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-import static org.junit.Assert.fail;
+package org.apache.flink.streaming.runtime.operators.windowing;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -57,9 +53,16 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
+
import org.junit.Ignore;
import org.junit.Test;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
/**
* Tests for checking whether {@link WindowOperator} can restore from snapshots that were done
* using the Flink 1.2 {@link WindowOperator}.
@@ -78,7 +81,7 @@ public class WindowOperatorFrom12MigrationTest {
@Ignore
@Test
public void writeSessionWindowsWithCountTriggerSnapshot() throws Exception {
- final int SESSION_SIZE = 3;
+ final int sessionSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -86,7 +89,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
- EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -122,7 +125,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreSessionWindowsWithCountTrigger() throws Exception {
- final int SESSION_SIZE = 3;
+ final int sessionSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -130,7 +133,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
- EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -157,7 +160,6 @@ public class WindowOperatorFrom12MigrationTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
-
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
@@ -177,7 +179,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void writeSessionWindowsWithCountTriggerInMintConditionSnapshot() throws Exception {
- final int SESSION_SIZE = 3;
+ final int sessionSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -185,7 +187,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
- EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -215,7 +217,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Exception {
- final int SESSION_SIZE = 3;
+ final int sessionSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -223,7 +225,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
- EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -280,7 +282,7 @@ public class WindowOperatorFrom12MigrationTest {
@Ignore
@Test
public void writeReducingEventTimeWindowsSnapshot() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -289,7 +291,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -336,7 +338,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreReducingEventTimeWindows() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -345,7 +347,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -391,7 +393,7 @@ public class WindowOperatorFrom12MigrationTest {
@Ignore
@Test
public void writeApplyEventTimeWindowsSnapshot() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -399,7 +401,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -446,7 +448,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreApplyEventTimeWindows() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -454,7 +456,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -500,7 +502,7 @@ public class WindowOperatorFrom12MigrationTest {
@Ignore
@Test
public void writeReducingProcessingTimeWindowsSnapshot() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -509,7 +511,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -550,7 +552,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreReducingProcessingTimeWindows() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -559,7 +561,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -600,7 +602,7 @@ public class WindowOperatorFrom12MigrationTest {
@Ignore
@Test
public void writeApplyProcessingTimeWindowsSnapshot() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -608,7 +610,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -648,7 +650,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreApplyProcessingTimeWindows() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -656,7 +658,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -738,7 +740,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreAggregatingAlignedProcessingTimeWindows() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -747,7 +749,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -841,7 +843,7 @@ public class WindowOperatorFrom12MigrationTest {
@Test
public void testRestoreAccumulatingAlignedProcessingTimeWindows() throws Exception {
- final int WINDOW_SIZE = 3;
+ final int windowSize = 3;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -850,7 +852,7 @@ public class WindowOperatorFrom12MigrationTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -892,7 +894,6 @@ public class WindowOperatorFrom12MigrationTest {
testHarness.close();
}
-
private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 1L;
@@ -950,7 +951,7 @@ public class WindowOperatorFrom12MigrationTest {
}
}
- public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
+ private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
@@ -959,7 +960,7 @@ public class WindowOperatorFrom12MigrationTest {
}
}
- public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
+ private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
private static final long serialVersionUID = 1L;
private boolean openCalled = false;
@@ -995,7 +996,7 @@ public class WindowOperatorFrom12MigrationTest {
}
- public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+ private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override