You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/22 21:50:23 UTC
[04/10] flink git commit: [FLINK-6603] [streaming] Enable checkstyle
on test sources
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 2a6a723..d9fcc12 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -37,11 +37,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.CollectorOutput;
+
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -52,12 +52,15 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+/**
+ * Tests for {@link StreamSource} operators.
+ */
@SuppressWarnings("serial")
public class StreamSourceOperatorTest {
@@ -86,7 +89,6 @@ public class StreamSourceOperatorTest {
final StreamSource<String, InfiniteSource<String>> operator =
new StreamSource<>(new InfiniteSource<String>());
-
setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
operator.cancel();
@@ -106,7 +108,6 @@ public class StreamSourceOperatorTest {
final StreamSource<String, InfiniteSource<String>> operator =
new StreamSource<>(new InfiniteSource<String>());
-
setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
// trigger an async cancel in a bit
@@ -139,7 +140,6 @@ public class StreamSourceOperatorTest {
final StoppableStreamSource<String, InfiniteSource<String>> operator =
new StoppableStreamSource<>(new InfiniteSource<String>());
-
setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
operator.stop();
@@ -158,7 +158,6 @@ public class StreamSourceOperatorTest {
final StoppableStreamSource<String, InfiniteSource<String>> operator =
new StoppableStreamSource<>(new InfiniteSource<String>());
-
setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
// trigger an async cancel in a bit
@@ -179,7 +178,7 @@ public class StreamSourceOperatorTest {
}
/**
- * Test that latency marks are emitted
+ * Test that latency marks are emitted.
*/
@Test
public void testLatencyMarkEmission() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 6772db4..6e3be03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -24,15 +24,16 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
@@ -65,7 +66,6 @@ public class StreamTaskTimerTest {
assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
-
testHarness.endInput();
testHarness.waitForTaskCompletion();
@@ -109,8 +109,7 @@ public class StreamTaskTimerTest {
long deadline = System.currentTimeMillis() + 20000;
while (errorRef.get() == null &&
ValidatingProcessingTimeCallback.numInSequence < 4 &&
- System.currentTimeMillis() < deadline)
- {
+ System.currentTimeMillis() < deadline) {
Thread.sleep(100);
}
@@ -170,6 +169,9 @@ public class StreamTaskTimerTest {
// ------------------------------------------------------------------------
+ /**
+ * Identity mapper.
+ */
public static class DummyMapFunction<T> implements MapFunction<T, T> {
@Override
public T map(T value) {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 9897884..675ffa3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.junit.Test;
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link TestProcessingTimeService}.
+ */
public class TestProcessingTimeServiceTest {
@Test
@@ -90,6 +93,9 @@ public class TestProcessingTimeServiceTest {
// ------------------------------------------------------------------------
+ /**
+ * An {@link AsyncExceptionHandler} storing the handled exception.
+ */
public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
private final AtomicReference<Throwable> errorReference;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index f129c20..51af116 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -28,8 +28,13 @@ import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link TimestampsAndPeriodicWatermarksOperator}.
+ */
public class TimestampsAndPeriodicWatermarksOperatorTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
index 0333e93..a422432 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
@@ -30,6 +30,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link TimestampsAndPunctuatedWatermarksOperator}.
+ */
public class TimestampsAndPunctuatedWatermarksOperatorTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index 46d92af..d3fd585 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -27,6 +27,9 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
+/**
+ * Test base for {@link GenericWriteAheadSink}.
+ */
public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger {
protected abstract S createSink() throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 2f7e302..a57dcf1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -68,6 +68,9 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+/**
+ * Tests for {@link AccumulatingProcessingTimeWindowOperator}.
+ */
@SuppressWarnings({"serial"})
@PrepareForTest(InternalIterableWindowFunction.class)
@RunWith(PowerMockRunner.class)
@@ -220,7 +223,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
testHarness.close();
-
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
@@ -278,7 +280,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.setProcessingTime(currentTime);
}
-
List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
assertEquals(numElements, result.size());
@@ -322,7 +323,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.setProcessingTime(currentTime);
}
-
List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
assertEquals(numElements, result.size());
@@ -476,7 +476,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.setProcessingTime(200);
-
List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
assertEquals(6, result.size());
@@ -524,7 +523,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.setProcessingTime(200);
-
List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
assertEquals(6, result.size());
@@ -837,7 +835,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.restore(state);
testHarness.open();
-
// inject again the remaining elements
for (int i = numElementsFirst; i < numElements; i++) {
testHarness.processElement(new StreamRecord<>(i));
@@ -929,7 +926,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
testHarness.restore(state);
testHarness.open();
-
// inject again the remaining elements
for (int i = numElementsFirst; i < numElements; i++) {
testHarness.processElement(new StreamRecord<>(i));
@@ -1040,7 +1036,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// we use a concurrent map here even though there is no concurrency, to
// get "volatile" style access to entries
- static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+ private static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
private ValueState<Integer> state;
@@ -1053,9 +1049,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Override
public void process(Integer key,
- Context context,
- Iterable<Integer> values,
- Collector<Integer> out) throws Exception {
+ Context context,
+ Iterable<Integer> values,
+ Collector<Integer> out) throws Exception {
for (Integer i : values) {
// we need to update this state before emitting elements. Else, the test's main
// thread will have received all output elements before the state is updated and
@@ -1093,8 +1089,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
private static StreamTask<?, ?> createMockTaskWithTimer(
- final ProcessingTimeService timerService)
- {
+ final ProcessingTimeService timerService) {
StreamTask<?, ?> mockTask = createMockTask();
when(mockTask.getProcessingTimeService()).thenReturn(timerService);
return mockTask;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 1875bbb..62f4f0b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -35,9 +35,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
import org.junit.After;
import org.junit.Test;
@@ -56,6 +56,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+/**
+ * Tests for aligned {@link AggregatingProcessingTimeWindowOperator}.
+ */
@SuppressWarnings("serial")
public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@@ -66,9 +69,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector =
- new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+ new KeySelector<Tuple2<Integer, Integer>, Integer>() {
@Override
- public Integer getKey(Tuple2<Integer,Integer> value) {
+ public Integer getKey(Tuple2<Integer, Integer> value) {
return value.f0;
}
};
@@ -585,7 +588,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSlide);
-
OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(op);
@@ -824,7 +826,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
private static class StatefulFunction extends RichReduceFunction<Tuple2<Integer, Integer>> {
- static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+ private static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
private ValueState<Integer> state;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 81a9275..a7c6f47 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
@@ -1244,7 +1245,6 @@ public class AllWindowTranslationTest {
}
-
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithCustomTrigger() throws Exception {
@@ -1461,7 +1461,7 @@ public class AllWindowTranslationTest {
// UDFs
// ------------------------------------------------------------------------
- public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
+ private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
index 0f65a88..9c14a9f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
@@ -15,15 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
-import com.google.common.collect.Lists;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.Collection;
@@ -62,7 +64,7 @@ public class ContinuousEventTimeTriggerTest {
/**
- * Verify that state <TimeWindow>of separate windows does not leak into other windows.
+ * Verify that state <TimeWindow>of separate windows does not leak into other windows.
*/
@Test
public void testWindowSeparationAndFiring() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
index 16e353b..38dd01d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
-import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
index a46572b..23af838 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
@@ -15,10 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
+package org.apache.flink.streaming.runtime.operators.windowing;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
@@ -28,6 +27,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import org.mockito.Matchers;
@@ -38,27 +39,35 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/**
- * Tests for {@link EventTimeSessionWindows}
+ * Tests for {@link EventTimeSessionWindows}.
*/
public class EventTimeSessionWindowsTest extends TestLogger {
@Test
public void testWindowAssignment() {
- final int SESSION_GAP = 5000;
+ final int sessionGap = 5000;
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
- EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(SESSION_GAP));
+ EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(sessionGap));
- assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP)));
- assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
- assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+ assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + sessionGap)));
+ assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + sessionGap)));
+ assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + sessionGap)));
}
@Test
@@ -138,16 +147,16 @@ public class EventTimeSessionWindowsTest extends TestLogger {
public void testTimeUnits() {
// sanity check with one other time unit
- final int SESSION_GAP = 5000;
+ final int sessionGap = 5000;
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
- EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(SESSION_GAP / 1000));
+ EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(sessionGap / 1000));
- assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP)));
- assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
- assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+ assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + sessionGap)));
+ assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + sessionGap)));
+ assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + sessionGap)));
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
index 2d93ac0..2bcc192 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
-import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
index 7af4506..a89aec0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index e5d3ef0..8d65bb4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
import org.junit.Test;
@@ -58,18 +60,21 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Tests for {@link EvictingWindowOperator}.
+ */
public class EvictingWindowOperatorTest {
/**
- * Tests CountEvictor evictAfter behavior
+ * Tests CountEvictor evictAfter behavior.
* @throws Exception
*/
@Test
public void testCountEvictorEvictAfter() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int WINDOW_SIZE = 4;
- final int TRIGGER_COUNT = 2;
- final boolean EVICT_AFTER = true;
+ final int windowSize = 4;
+ final int triggerCount = 2;
+ final boolean evictAfter = true;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -80,7 +85,6 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
@@ -88,16 +92,14 @@ public class EvictingWindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
- CountTrigger.of(TRIGGER_COUNT),
- CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
+ CountTrigger.of(triggerCount),
+ CountEvictor.of(windowSize, evictAfter),
0,
null /* late data output tag */);
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -114,8 +116,6 @@ public class EvictingWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
@@ -139,18 +139,17 @@ public class EvictingWindowOperatorTest {
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
}
/**
- * Tests TimeEvictor evictAfter behavior
+ * Tests TimeEvictor evictAfter behavior.
* @throws Exception
*/
@Test
public void testTimeEvictorEvictAfter() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int TRIGGER_COUNT = 2;
- final boolean EVICT_AFTER = true;
+ final int triggerCount = 2;
+ final boolean evictAfter = true;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -161,7 +160,6 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
@@ -169,12 +167,11 @@ public class EvictingWindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
- CountTrigger.of(TRIGGER_COUNT),
- TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+ CountTrigger.of(triggerCount),
+ TimeEvictor.of(Time.seconds(2), evictAfter),
0,
null /* late data output tag */);
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -194,13 +191,10 @@ public class EvictingWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
-
-
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE));
-
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
@@ -218,14 +212,14 @@ public class EvictingWindowOperatorTest {
}
/**
- * Tests TimeEvictor evictBefore behavior
+ * Tests TimeEvictor evictBefore behavior.
* @throws Exception
*/
@Test
public void testTimeEvictorEvictBefore() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int TRIGGER_COUNT = 2;
- final int WINDOW_SIZE = 4;
+ final int triggerCount = 2;
+ final int windowSize = 4;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -236,20 +230,18 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
- CountTrigger.of(TRIGGER_COUNT),
+ CountTrigger.of(triggerCount),
TimeEvictor.of(Time.seconds(2)),
0,
null /* late data output tag */);
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -270,13 +262,10 @@ public class EvictingWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
-
-
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999));
-
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6500));
@@ -290,19 +279,18 @@ public class EvictingWindowOperatorTest {
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
}
/**
- * Tests time evictor, if no timestamp information in the StreamRecord
- * No element will be evicted from the window
+ * Tests time evictor, if no timestamp information in the StreamRecord.
+ * No element will be evicted from the window.
* @throws Exception
*/
@Test
public void testTimeEvictorNoTimestamp() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int TRIGGER_COUNT = 2;
- final boolean EVICT_AFTER = true;
+ final int triggerCount = 2;
+ final boolean evictAfter = true;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -313,7 +301,6 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
@@ -321,12 +308,11 @@ public class EvictingWindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
- CountTrigger.of(TRIGGER_COUNT),
- TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+ CountTrigger.of(triggerCount),
+ TimeEvictor.of(Time.seconds(2), evictAfter),
0,
null /* late data output tag */);
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -345,13 +331,10 @@ public class EvictingWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
-
-
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
@@ -365,19 +348,18 @@ public class EvictingWindowOperatorTest {
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
}
/**
- * Tests DeltaEvictor, evictBefore behavior
+ * Tests DeltaEvictor, evictBefore behavior.
* @throws Exception
*/
@Test
public void testDeltaEvictorEvictBefore() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int TRIGGER_COUNT = 2;
- final boolean EVICT_AFTER = false;
- final int THRESHOLD = 2;
+ final int triggerCount = 2;
+ final boolean evictAfter = false;
+ final int threshold = 2;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -388,7 +370,6 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
@@ -396,18 +377,16 @@ public class EvictingWindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
- CountTrigger.of(TRIGGER_COUNT),
- DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+ CountTrigger.of(triggerCount),
+ DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
@Override
public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
return newDataPoint.f1 - oldDataPoint.f1;
}
- }, EVICT_AFTER),
+ }, evictAfter),
0,
null /* late data output tag */);
-
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -447,15 +426,15 @@ public class EvictingWindowOperatorTest {
}
/**
- * Tests DeltaEvictor, evictAfter behavior
+ * Tests DeltaEvictor, evictAfter behavior.
* @throws Exception
*/
@Test
public void testDeltaEvictorEvictAfter() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int TRIGGER_COUNT = 2;
- final boolean EVICT_AFTER = true;
- final int THRESHOLD = 2;
+ final int triggerCount = 2;
+ final boolean evictAfter = true;
+ final int threshold = 2;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -466,7 +445,6 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
@@ -474,18 +452,16 @@ public class EvictingWindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
- CountTrigger.of(TRIGGER_COUNT),
- DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+ CountTrigger.of(triggerCount),
+ DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
@Override
public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
return newDataPoint.f1 - oldDataPoint.f1;
}
- }, EVICT_AFTER),
+ }, evictAfter),
0,
null /* late data output tag */);
-
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -528,8 +504,8 @@ public class EvictingWindowOperatorTest {
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
- final int WINDOW_SIZE = 4;
- final int WINDOW_SLIDE = 2;
+ final int windowSize = 4;
+ final int windowSlide = 2;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -540,7 +516,6 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
@@ -552,12 +527,11 @@ public class EvictingWindowOperatorTest {
new SumReducer(),
// on some versions of Java we seem to need the explicit type
new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
- CountTrigger.of(WINDOW_SLIDE),
- CountEvictor.of(WINDOW_SIZE),
+ CountTrigger.of(windowSlide),
+ CountEvictor.of(windowSize),
0,
null /* late data output tag */);
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -580,8 +554,6 @@ public class EvictingWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
@@ -604,12 +576,11 @@ public class EvictingWindowOperatorTest {
public void testCountTriggerWithApply() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int WINDOW_SIZE = 4;
- final int WINDOW_SLIDE = 2;
+ final int windowSize = 4;
+ final int windowSlide = 2;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
-
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
@@ -617,7 +588,6 @@ public class EvictingWindowOperatorTest {
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
-
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
@@ -625,12 +595,11 @@ public class EvictingWindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
- CountTrigger.of(WINDOW_SLIDE),
- CountEvictor.of(WINDOW_SIZE),
+ CountTrigger.of(windowSlide),
+ CountEvictor.of(windowSize),
0,
null /* late data output tag */);
-
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -653,8 +622,6 @@ public class EvictingWindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
@@ -679,7 +646,7 @@ public class EvictingWindowOperatorTest {
public void testTumblingWindowWithApply() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
- final int WINDOW_SIZE = 4;
+ final int windowSize = 4;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
@@ -691,14 +658,14 @@ public class EvictingWindowOperatorTest {
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
EventTimeTrigger.create(),
- CountEvictor.of(WINDOW_SIZE),
+ CountEvictor.of(windowSize),
0,
null /* late data output tag */);
@@ -731,7 +698,6 @@ public class EvictingWindowOperatorTest {
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
expectedOutput.add(new Watermark(3999));
-
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new EvictingWindowOperatorTest.ResultSortComparator());
testHarness.close();
@@ -741,10 +707,9 @@ public class EvictingWindowOperatorTest {
// UDFs
// ------------------------------------------------------------------------
- public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
+ private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
-
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
@@ -752,7 +717,7 @@ public class EvictingWindowOperatorTest {
}
}
- public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
+ private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
private static final long serialVersionUID = 1L;
private boolean openCalled = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
index 37fad7e..9292bfb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
+package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -24,15 +24,18 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
/**
- * Tests for {@link GlobalWindows}
+ * Tests for {@link GlobalWindows}.
*/
public class GlobalWindowsTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
index 8786c4e..8eb26ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link KeyMap}.
+ */
public class KeyMapPutIfAbsentTest {
@Test
@@ -38,7 +41,7 @@ public class KeyMapPutIfAbsentTest {
factory.set(2 * i + 1);
map.putIfAbsent(i, factory);
- assertEquals(i+1, map.size());
+ assertEquals(i + 1, map.size());
assertTrue(map.getCurrentTableCapacity() > map.size());
assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
assertTrue(map.size() <= map.getRehashThreshold());
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
index 5b59bea..522c691 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
@@ -29,6 +29,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link KeyMap}.
+ */
public class KeyMapPutTest {
@Test
@@ -41,7 +44,7 @@ public class KeyMapPutTest {
for (int i = 0; i < numElements; i++) {
map.put(i, 2 * i + 1);
- assertEquals(i+1, map.size());
+ assertEquals(i + 1, map.size());
assertTrue(map.getCurrentTableCapacity() > map.size());
assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
assertTrue(map.size() <= map.getRehashThreshold());
@@ -72,7 +75,6 @@ public class KeyMapPutTest {
assertEquals(numElements, numContained);
assertEquals(numElements, bitset.cardinality());
-
assertEquals(numElements, map.size());
assertEquals(numElements, map.traverseAndCountElements());
assertEquals(1 << 21, map.getCurrentTableCapacity());
@@ -91,18 +93,18 @@ public class KeyMapPutTest {
final int numElements = 1000000;
for (int i = 0; i < numElements; i++) {
- Integer put = map.put(i, 2*i+1);
+ Integer put = map.put(i, 2 * i + 1);
assertNull(put);
}
for (int i = 0; i < numElements; i += 3) {
- Integer put = map.put(i, 2*i);
+ Integer put = map.put(i, 2 * i);
assertNotNull(put);
- assertEquals(2*i+1, put.intValue());
+ assertEquals(2 * i + 1, put.intValue());
}
for (int i = 0; i < numElements; i++) {
- int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
+ int expected = (i % 3 == 0) ? (2 * i) : (2 * i + 1);
assertEquals(expected, map.get(i).intValue());
}
@@ -111,14 +113,13 @@ public class KeyMapPutTest {
assertEquals(1 << 21, map.getCurrentTableCapacity());
assertTrue(map.getLongestChainLength() <= 7);
-
BitSet bitset = new BitSet();
int numContained = 0;
for (KeyMap.Entry<Integer, Integer> entry : map) {
numContained++;
int key = entry.getKey();
- int expected = key % 3 == 0 ? (2*key) : (2*key+1);
+ int expected = key % 3 == 0 ? (2 * key) : (2 * key + 1);
assertEquals(expected, entry.getValue().intValue());
assertFalse(bitset.get(key));
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
index c7fb108..a442466 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
@@ -24,8 +24,15 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Random;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KeyMap}.
+ */
public class KeyMapTest {
@Test
@@ -218,7 +225,6 @@ public class KeyMapTest {
nextKeyValue += keyRnd.nextInt(maxStride) + 1;
}
-
// check that all maps contain the total number of elements
int numContained = 0;
for (KeyMap<?, ?> map : maps) {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index aa9cb91..0c45d03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,11 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import org.mockito.Matchers;
@@ -44,9 +45,16 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests for verifying that {@link MergingWindowSet} correctly merges windows in various situations
@@ -152,7 +160,6 @@ public class MergingWindowSetTest {
assertEquals(new TimeWindow(11, 14), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction));
assertFalse(mergeFunction.hasMerged());
-
assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(11, 14)));
@@ -179,7 +186,6 @@ public class MergingWindowSetTest {
assertEquals(new TimeWindow(10, 15), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction));
assertFalse(mergeFunction.hasMerged());
-
assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(10, 15)));
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
index 461b5fc..ceda3b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
@@ -15,19 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
+package org.apache.flink.streaming.runtime.operators.windowing;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import org.mockito.Matchers;
@@ -36,13 +37,23 @@ import java.util.Collection;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
- * Tests for {@link ProcessingTimeSessionWindows}
+ * Tests for {@link ProcessingTimeSessionWindows}.
*/
public class ProcessingTimeSessionWindowsTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
index a0c2347..791eb42 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
-import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
index 4302d4d..7139a44 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -22,6 +23,7 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
@@ -35,7 +37,12 @@ import static org.apache.flink.streaming.runtime.operators.windowing.WindowOpera
import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTriggerContext;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests for {@link PurgingTrigger}.
@@ -51,7 +58,7 @@ public class PurgingTriggerTest {
for (Method triggerMethod : Trigger.class.getDeclaredMethods()) {
// try retrieving the method, this will throw an exception if we can't find it
- PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(),triggerMethod.getParameterTypes());
+ PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(), triggerMethod.getParameterTypes());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index ff1cbdf..62c484d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -15,19 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.runtime.operators.windowing;
-import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -50,11 +40,24 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
+
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* These tests verify that {@link WindowOperator} correctly interacts with the other windowing
* components: {@link WindowAssigner},
@@ -72,7 +75,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
-
ReducingStateDescriptor<Integer> intReduceSumDescriptor =
new ReducingStateDescriptor<>(
"int-reduce",
@@ -89,7 +91,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
final ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);
-
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction);
@@ -252,7 +253,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
ListStateDescriptor<Integer> intListDescriptor =
new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE);
-
@SuppressWarnings("unchecked")
WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> operator = new WindowOperator<>(
assigner,
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
index 050178b..33f4747 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.typeutils.TypeSerializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index 4599d19..95a8314 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
+package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
/**
- * Tests for {@link SlidingEventTimeWindows}
+ * Tests for {@link SlidingEventTimeWindows}.
*/
public class SlidingEventTimeWindowsTest extends TestLogger {
@@ -148,7 +151,6 @@ public class SlidingEventTimeWindowsTest extends TestLogger {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
}
-
try {
SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
fail("should fail");
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
index 20a9924..69b628a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.windowing;
+package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * Tests for {@link SlidingProcessingTimeWindows}
+ * Tests for {@link SlidingProcessingTimeWindows}.
*/
public class SlidingProcessingTimeWindowsTest extends TestLogger {
@@ -157,7 +161,6 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
}
-
try {
SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
fail("should fail");