You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/22 21:50:25 UTC
[06/10] flink git commit: [FLINK-6603] [streaming] Enable checkstyle
on test sources
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 6dcb56b..e14430e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -17,6 +17,7 @@
*/
// We have it in this package because we could not mock the methods otherwise
+
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -92,7 +94,6 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
-
final Answer<BufferAndAvailability> answer = new Answer<BufferAndAvailability>() {
@Override
public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -178,7 +179,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
return true;
}
- public static class InputValue<T> {
+ private static class InputValue<T> {
private Object elementOrEvent;
private boolean isStreamEnd;
private boolean isStreamRecord;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 7dc889c..acb531d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -17,23 +17,16 @@
package org.apache.flink.streaming.api;
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -41,8 +34,18 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import com.google.common.collect.ImmutableList;
import org.junit.Test;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AggregationFunction}.
+ */
public class AggregationFunctionTest {
@Test
@@ -188,9 +191,9 @@ public class AggregationFunctionTest {
// preparing expected outputs
List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of(
- Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2),
- Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2),
- Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2));
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
+ Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2),
+ Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2));
List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of(
Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
@@ -198,9 +201,9 @@ public class AggregationFunctionTest {
Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8));
List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of(
- Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
- Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
- Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0));
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0));
List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of(
Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
@@ -209,7 +212,7 @@ public class AggregationFunctionTest {
// some necessary boiler plate
TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor
- .getForObject(Tuple3.of(0,0,0));
+ .getForObject(Tuple3.of(0, 0, 0));
ExecutionConfig config = new ExecutionConfig();
@@ -351,6 +354,9 @@ public class AggregationFunctionTest {
return inputList;
}
+ /**
+ * POJO.
+ */
public static class MyPojo implements Serializable {
private static final long serialVersionUID = 1L;
@@ -380,6 +386,9 @@ public class AggregationFunctionTest {
}
}
+ /**
+ * POJO.
+ */
public static class MyPojo3 implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index e5fbfda..ea0e139 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.api;
-import java.lang.reflect.Method;
-import java.util.List;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -57,18 +54,18 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
-import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -80,8 +77,18 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import static org.junit.Assert.*;
+import java.lang.reflect.Method;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * Tests for {@link DataStream}.
+ */
@SuppressWarnings("serial")
public class DataStreamTest {
@@ -142,7 +149,7 @@ public class DataStreamTest {
}
}).setParallelism(2);
- DataStream<Long> unionDifferingParallelism= input2.union(input3).map(new MapFunction<Long, Long>() {
+ DataStream<Long> unionDifferingParallelism = input2.union(input3).map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return null;
@@ -578,6 +585,7 @@ public class DataStreamTest {
@Override
public void flatMap1(Long value, Collector<Long> out) throws Exception {
}
+
@Override
public void flatMap2(Long value, Collector<Long> out) throws Exception {
}
@@ -744,7 +752,6 @@ public class DataStreamTest {
assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator);
}
-
@Test
public void operatorTest() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -761,7 +768,6 @@ public class DataStreamTest {
map.addSink(new DiscardingSink<Integer>());
assertEquals(mapFunction, getFunctionForDataStream(map));
-
FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() {
private static final long serialVersionUID = 1L;
@@ -1090,7 +1096,7 @@ public class DataStreamTest {
expectedException.expect(InvalidProgramException.class);
expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key."));
- input.keyBy(new KeySelector<Tuple2<Integer[],String>, Tuple2<Integer[],String>>() {
+ input.keyBy(new KeySelector<Tuple2<Integer[], String>, Tuple2<Integer[], String>>() {
@Override
public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception {
return value;
@@ -1121,6 +1127,9 @@ public class DataStreamTest {
});
}
+ /**
+ * POJO without hashCode.
+ */
public static class POJOWithoutHashCode {
private int[] id;
@@ -1140,6 +1149,9 @@ public class DataStreamTest {
}
}
+ /**
+ * POJO with hashCode.
+ */
public static class POJOWithHashCode extends POJOWithoutHashCode {
public POJOWithHashCode() {
@@ -1244,7 +1256,7 @@ public class DataStreamTest {
}
}
- public static class CustomPOJO {
+ private static class CustomPOJO {
private String s;
private int i;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index d6fcd61..b231bea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -23,15 +23,19 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.TestLogger;
+
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Tests for {@link RestartStrategies}.
+ */
public class RestartStrategyTest extends TestLogger {
/**
* Tests that in a streaming use case where checkpointing is enabled, a
* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
- * strategy has been specified
+ * strategy has been specified.
*/
@Test
public void testAutomaticRestartingWhenCheckpointing() throws Exception {
@@ -53,7 +57,7 @@ public class RestartStrategyTest extends TestLogger {
/**
* Checks that in a streaming use case where checkpointing is enabled and the number
- * of execution retries is set to 0, restarting is deactivated
+ * of execution retries is set to 0, restarting is deactivated.
*/
@Test
public void testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero() throws Exception {
@@ -94,7 +98,7 @@ public class RestartStrategyTest extends TestLogger {
Assert.assertNotNull(restartStrategy);
Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
- Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getRestartAttempts());
- Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getDelayBetweenAttemptsInterval().toMilliseconds());
+ Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts());
+ Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getDelayBetweenAttemptsInterval().toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index dd4ff33..cabc7a1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -17,18 +17,22 @@
package org.apache.flink.streaming.api;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.SourceFunctionUtil;
+
import org.junit.Test;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link SourceFunction}.
+ */
public class SourceFunctionTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index ff1eaaa..91cbe13 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
+
import org.junit.Assert;
import org.junit.Test;
@@ -45,6 +46,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link StreamExecutionEnvironment}.
+ */
public class StreamExecutionEnvironmentTest {
@Test
@@ -143,7 +147,6 @@ public class StreamExecutionEnvironmentTest {
}
};
-
SingleOutputStreamOperator<Object> operator =
env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
@@ -227,7 +230,6 @@ public class StreamExecutionEnvironmentTest {
// Utilities
/////////////////////////////////////////////////////////////
-
private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
StreamGraph streamGraph = env.getStreamGraph();
@@ -242,7 +244,7 @@ public class StreamExecutionEnvironmentTest {
return (SourceFunction<T>) operator.getUserFunction();
}
- public static class DummySplittableIterator<T> extends SplittableIterator<T> {
+ private static class DummySplittableIterator<T> extends SplittableIterator<T> {
private static final long serialVersionUID = 1312752876092210499L;
@SuppressWarnings("unchecked")
@@ -272,7 +274,7 @@ public class StreamExecutionEnvironmentTest {
}
}
- public static class ParentClass {
+ private static class ParentClass {
int num;
String string;
public ParentClass(int num, String string) {
@@ -281,7 +283,7 @@ public class StreamExecutionEnvironmentTest {
}
}
- public static class SubClass extends ParentClass{
+ private static class SubClass extends ParentClass{
public SubClass(int num, String string) {
super(num, string);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 83fb2f7..5baa980 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -17,10 +17,6 @@
package org.apache.flink.streaming.api;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -35,6 +31,12 @@ import org.apache.flink.util.Collector;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link TypeFill}.
+ */
@SuppressWarnings("serial")
public class TypeFillTest {
@@ -47,7 +49,6 @@ public class TypeFillTest {
fail();
} catch (Exception ignored) {}
-
DataStream<Long> source = env.generateSequence(1, 10);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index 51b9d9a..4d201f4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+
import org.junit.Assert;
import org.junit.Test;
@@ -29,6 +30,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+/**
+ * Tests for {@link ListCheckpointed}.
+ */
public class ListCheckpointedTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
index 3194f9e..a2f8ed6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
@@ -17,17 +17,20 @@
package org.apache.flink.streaming.api.collector;
-import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
+import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link OutputSelector}.
+ */
public class OutputSelectorTest {
static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
index d346fdc..c053598 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -20,12 +20,15 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link Flip6LocalStreamEnvironment}.
+ */
@SuppressWarnings("serial")
public class LocalStreamEnvironmentITCase extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
index 2e92807..16f87ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
@@ -19,11 +19,15 @@
package org.apache.flink.streaming.api.functions;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link AscendingTimestampExtractor}.
+ */
public class AscendingTimestampExtractorTest {
@Test
@@ -87,8 +91,6 @@ public class AscendingTimestampExtractorTest {
assertEquals(500L, extractor.extractTimestamp(500L, 0L));
assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
-
-
}
private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java
index d9a3812..91931d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java
@@ -15,18 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.api.functions;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link BoundedOutOfOrdernessTimestampExtractor}.
+ */
public class BoundedOutOfOrdernessTimestampExtractorTest {
-
@Test
public void testInitializationAndRuntime() {
Time maxAllowedLateness = Time.milliseconds(10L);
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index 3744eb9..9268ef7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -41,7 +41,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link org.apache.flink.streaming.api.functions.source.FromElementsFunction}.
@@ -134,12 +136,12 @@ public class FromElementsFunctionTest {
@Test
public void testCheckpointAndRestore() {
try {
- final int NUM_ELEMENTS = 10000;
+ final int numElements = 10000;
- List<Integer> data = new ArrayList<Integer>(NUM_ELEMENTS);
- List<Integer> result = new ArrayList<Integer>(NUM_ELEMENTS);
+ List<Integer> data = new ArrayList<Integer>(numElements);
+ List<Integer> result = new ArrayList<Integer>(numElements);
- for (int i = 0; i < NUM_ELEMENTS; i++) {
+ for (int i = 0; i < numElements; i++) {
data.add(i);
}
@@ -171,7 +173,7 @@ public class FromElementsFunctionTest {
Thread.sleep(1000);
// make a checkpoint
- List<Integer> checkpointData = new ArrayList<>(NUM_ELEMENTS);
+ List<Integer> checkpointData = new ArrayList<>(numElements);
OperatorStateHandles handles = null;
synchronized (ctx.getCheckpointLock()) {
handles = testHarness.snapshot(566, System.currentTimeMillis());
@@ -215,7 +217,7 @@ public class FromElementsFunctionTest {
// Test Types
// ------------------------------------------------------------------------
- public static class MyPojo {
+ private static class MyPojo {
public long val1;
public int val2;
@@ -244,7 +246,7 @@ public class FromElementsFunctionTest {
}
}
- public static class SerializationErrorType implements Value {
+ private static class SerializationErrorType implements Value {
private static final long serialVersionUID = -6037206294939421807L;
@@ -259,7 +261,7 @@ public class FromElementsFunctionTest {
}
}
- public static class DeserializeTooMuchType implements Value {
+ private static class DeserializeTooMuchType implements Value {
private static final long serialVersionUID = -6037206294939421807L;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java
index a976453..c1ca9a1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java
@@ -19,10 +19,15 @@
package org.apache.flink.streaming.api.functions;
import org.apache.flink.streaming.api.watermark.Watermark;
+
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link IngestionTimeExtractor}.
+ */
public class IngestionTimeExtractorTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index 55a0e7f..ee666df 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -36,7 +36,6 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
private final long delay;
-
public ListSourceContext(List<T> target) {
this(target, 0L);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
index 9030e9d..de9f1c7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+
import org.junit.Assert;
import org.junit.Test;
@@ -35,6 +36,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+/**
+ * Tests for {@link StatefulSequenceSource}.
+ */
public class StatefulSequenceSourceTest {
@Test
@@ -190,7 +194,7 @@ public class StatefulSequenceSourceTest {
private final List<Long> localOutput;
public BlockingSourceContext(String name, OneShotLatch latchToTrigger, OneShotLatch latchToWait,
- ConcurrentHashMap<String, List<Long>> output, int elemToFire) {
+ ConcurrentHashMap<String, List<Long>> output, int elemToFire) {
this.name = name;
this.lock = new Object();
this.latchToTrigger = latchToTrigger;
@@ -225,7 +229,6 @@ public class StatefulSequenceSourceTest {
}
}
-
@Override
public void emitWatermark(Watermark mark) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 562883d..224b376 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -32,8 +32,8 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.junit.Test;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * Test cases for {@link RichAsyncFunction}
+ * Test cases for {@link RichAsyncFunction}.
*/
public class RichAsyncFunctionTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
index d3a9d3d..afd1101 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
@@ -21,9 +21,13 @@ package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
+
import org.junit.Test;
import org.mockito.Mockito;
+/**
+ * Tests for {@link OutputFormatSinkFunction}.
+ */
public class OutputFormatSinkFunctionTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index 877e707..63e83d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -18,11 +18,12 @@
package org.apache.flink.streaming.api.functions.sink;
-import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.IOUtils;
import org.junit.Test;
import java.io.BufferedReader;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
index d81b440..08985bc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
@@ -30,8 +30,8 @@ public class FileMonitoringFunctionTest {
@Test
public void testForEmptyLocation() throws Exception {
- final FileMonitoringFunction fileMonitoringFunction
- = new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
+ final FileMonitoringFunction fileMonitoringFunction =
+ new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
new Thread() {
@Override
@@ -61,7 +61,9 @@ public class FileMonitoringFunctionTest {
public void markAsTemporarilyIdle() {}
@Override
- public Object getCheckpointLock() { return null; }
+ public Object getCheckpointLock() {
+ return null;
+ }
@Override
public void close() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index bb80228..b99119e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -34,12 +34,16 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
+
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
+/**
+ * Tests for {@link InputFormatSourceFunction}.
+ */
public class InputFormatSourceFunctionTest {
@Test
@@ -81,8 +85,7 @@ public class InputFormatSourceFunctionTest {
Assert.assertTrue(!format.isInputFormatOpen);
}
-
- private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
+ private static class LifeCycleTestInputFormat extends RichInputFormat<Integer, InputSplit> {
private static final long serialVersionUID = 7408902249499583273L;
private boolean isConfigured = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
index 85fa30b..3d14544 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -18,10 +18,9 @@
package org.apache.flink.streaming.api.functions.source;
-import org.apache.commons.io.IOUtils;
-
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.commons.io.IOUtils;
import org.junit.Test;
import java.io.EOFException;
@@ -29,7 +28,8 @@ import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
@@ -38,7 +38,6 @@ public class SocketTextStreamFunctionTest {
private static final String LOCALHOST = "127.0.0.1";
-
@Test
public void testSocketSourceSimpleOutput() throws Exception {
ServerSocket server = new ServerSocket(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
index c98a659..a110af0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-import static org.junit.Assert.assertEquals;
-
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple10;
@@ -45,10 +43,15 @@ import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.ArrayFromTuple;
+
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link Tuple} to {@code Array}.
+ */
public class ArrayFromTupleTest {
private String[] testStrings;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
index 3b098c3..bb2a3c3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
@@ -17,12 +17,16 @@
package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.java.tuple.Tuple2;
+
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ConcatenatedExtract}.
+ */
public class ConcatenatedExtractTest {
private String[] testStringArray1 = { "1", "2", "3" };
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
index d274f4e..a71c0b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
@@ -21,6 +21,9 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link FieldFromArray}.
+ */
public class FieldFromArrayTest {
String[] testStringArray = { "0", "1", "2", "3", "4" };
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
index c05f281..89ccc45 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
@@ -43,11 +43,15 @@ import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.tuple.Tuple9;
+
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link FieldFromTuple}.
+ */
public class FieldFromTupleTest {
private String[] testStrings;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
index 7a9a716..4404cd8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
@@ -17,11 +17,13 @@
package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromArray;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FieldsFromArray}.
+ */
public class FieldsFromArrayTest {
String[] testStringArray = { "0", "1", "2", "3", "4" };
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
index 025ed8a..f46d5eb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple10;
@@ -45,10 +43,15 @@ import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromTuple;
+
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FieldsFromTuple}.
+ */
public class FieldsFromTupleTest {
private double[] testDouble;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
index cac59ae..6c8d5d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -17,20 +17,21 @@
package org.apache.flink.streaming.api.graph;
-import static org.junit.Assert.*;
-
-import java.util.List;
-
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
/**
* This verifies that slot sharing groups are correctly forwarded from user job to JobGraph.
*
@@ -46,7 +47,9 @@ public class SlotAllocationTest extends TestLogger {
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
@Override
- public boolean filter(Long value) { return false; }
+ public boolean filter(Long value) {
+ return false;
+ }
};
env.generateSequence(1, 10)
@@ -89,7 +92,9 @@ public class SlotAllocationTest extends TestLogger {
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
@Override
- public boolean filter(Long value) { return false; }
+ public boolean filter(Long value) {
+ return false;
+ }
};
DataStream<Long> src1 = env.generateSequence(1, 10);
@@ -127,7 +132,9 @@ public class SlotAllocationTest extends TestLogger {
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
@Override
- public boolean filter(Long value) { return false; }
+ public boolean filter(Long value) {
+ return false;
+ }
};
DataStream<Long> src1 = env.generateSequence(1, 10).slotSharingGroup("group-1");
@@ -144,7 +151,6 @@ public class SlotAllocationTest extends TestLogger {
assertNotEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
}
-
@Test
public void testCoOperation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 5fdacd4..8149d24 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -59,8 +60,7 @@ public class StreamGraphGeneratorTest {
/**
* This tests whether virtual Transformations behave correctly.
*
- * <p>
- * Verifies that partitioning, output selector, selected names are correctly set in the
+ * <p>Verifies that partitioning, output selector, selected names are correctly set in the
* StreamGraph when they are intermixed.
*/
@Test
@@ -111,7 +111,6 @@ public class StreamGraphGeneratorTest {
.select("even")
.shuffle();
-
SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
.map(new NoOpIntMap());
@@ -143,7 +142,7 @@ public class StreamGraphGeneratorTest {
/**
* This tests whether virtual Transformations behave correctly.
*
- * Checks whether output selector, partitioning works correctly when applied on a union.
+ * <p>Checks whether output selector, partitioning works correctly when applied on a union.
*/
@Test
public void testVirtualTransformations2() throws Exception {
@@ -270,7 +269,7 @@ public class StreamGraphGeneratorTest {
}
/**
- * Tests that the global and operator-wide max parallelism setting is respected
+ * Tests that the global and operator-wide max parallelism setting is respected.
*/
@Test
public void testMaxParallelismForwarding() {
@@ -387,13 +386,13 @@ public class StreamGraphGeneratorTest {
env.getConfig().setMaxParallelism(maxParallelism);
DataStream<Integer> keyedResult = input1.connect(input2).keyBy(
- new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = -6908614081449363419L;
+ new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = -6908614081449363419L;
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
},
new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 3195683453223164931L;
@@ -501,6 +500,5 @@ public class StreamGraphGeneratorTest {
return value;
}
- };
-
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6c5baca..6dd7de7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -47,6 +47,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link StreamingJobGraphGenerator}.
+ */
@SuppressWarnings("serial")
public class StreamingJobGraphGeneratorTest extends TestLogger {
@@ -158,7 +161,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
/**
* Verifies that the resources are merged correctly for chained operators (covers source and sink cases)
- * when generating job graph
+ * when generating job graph.
*/
@Test
public void testResourcesForChainedSourceSink() throws Exception {
@@ -230,7 +233,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
/**
* Verifies that the resources are merged correctly for chained operators (covers middle chaining and iteration cases)
- * when generating job graph
+ * when generating job graph.
*/
@Test
public void testResourcesForIteration() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 67004ea..798c81f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -35,21 +35,19 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.Assert;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -261,18 +259,18 @@ public class AbstractStreamOperatorTest {
*/
@Test
public void testStateAndTimerStateShufflingScalingUp() throws Exception {
- final int MAX_PARALLELISM = 10;
+ final int maxParallelism = 10;
// first get two keys that will fall into different key-group ranges that go
// to different operator subtasks when we restore
// get two sub key-ranges so that we can restore two ranges separately
- KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1);
- KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1);
+ KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (maxParallelism / 2) - 1);
+ KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, maxParallelism - 1);
// get two different keys, one per sub range
- int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM);
- int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM);
+ int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism);
+ int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism);
TestOperator testOperator = new TestOperator();
@@ -281,7 +279,7 @@ public class AbstractStreamOperatorTest {
testOperator,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
- MAX_PARALLELISM,
+ maxParallelism,
1, /* num subtasks */
0 /* subtask index */);
@@ -312,7 +310,7 @@ public class AbstractStreamOperatorTest {
testOperator1,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
- MAX_PARALLELISM,
+ maxParallelism,
2, /* num subtasks */
0 /* subtask index */);
@@ -352,7 +350,7 @@ public class AbstractStreamOperatorTest {
testOperator2,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
- MAX_PARALLELISM,
+ maxParallelism,
2, /* num subtasks */
1 /* subtask index */);
@@ -383,18 +381,18 @@ public class AbstractStreamOperatorTest {
@Test
public void testStateAndTimerStateShufflingScalingDown() throws Exception {
- final int MAX_PARALLELISM = 10;
+ final int maxParallelism = 10;
// first get two keys that will fall into different key-group ranges that go
// to different operator subtasks when we restore
// get two sub key-ranges so that we can restore two ranges separately
- KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1);
- KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1);
+ KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (maxParallelism / 2) - 1);
+ KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, maxParallelism - 1);
// get two different keys, one per sub range
- int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM);
- int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM);
+ int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism);
+ int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism);
TestOperator testOperator1 = new TestOperator();
@@ -403,7 +401,7 @@ public class AbstractStreamOperatorTest {
testOperator1,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
- MAX_PARALLELISM,
+ maxParallelism,
2, /* num subtasks */
0 /* subtask index */);
@@ -420,11 +418,10 @@ public class AbstractStreamOperatorTest {
testOperator2,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
- MAX_PARALLELISM,
+ maxParallelism,
2, /* num subtasks */
1 /* subtask index */);
-
testHarness2.setup();
testHarness2.open();
@@ -458,7 +455,7 @@ public class AbstractStreamOperatorTest {
testOperator3,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
- MAX_PARALLELISM,
+ maxParallelism,
1, /* num subtasks */
0 /* subtask index */);
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index d331171..e8b4c9e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
+
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 90470ac..7dba4af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -53,12 +53,16 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Tests for {@link FoldApplyProcessWindowFunction}.
+ */
public class FoldApplyProcessWindowFunctionTest {
/**
@@ -290,7 +294,7 @@ public class FoldApplyProcessWindowFunctionTest {
Assert.assertEquals(expected, result);
}
- public static class DummyKeyedStateStore implements KeyedStateStore {
+ private static class DummyKeyedStateStore implements KeyedStateStore {
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
@@ -318,7 +322,7 @@ public class FoldApplyProcessWindowFunctionTest {
}
}
- public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+ private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index fecd440..7cf18dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -37,12 +37,16 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.util.Collector;
-import org.junit.Test;
+
import org.junit.Assert;
+import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Tests for {@link FoldApplyWindowFunction}.
+ */
public class FoldApplyWindowFunctionTest {
/**
@@ -138,7 +142,7 @@ public class FoldApplyWindowFunctionTest {
Assert.assertEquals(expected, result);
}
- public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+ private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 680f2ac..5a4f1c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -46,7 +48,13 @@ import java.util.Set;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/**
* Tests for {@link HeapInternalTimerService}.
@@ -111,7 +119,7 @@ public class HeapInternalTimerServiceTest {
for (int i = 0; i < totalNoOfTimers; i++) {
// create the timer to be registered
- InternalTimer<Integer, String> timer = new InternalTimer<>(10 + i, i, "hello_world_"+ i);
+ InternalTimer<Integer, String> timer = new InternalTimer<>(10 + i, i, "hello_world_" + i);
int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), totalNoOfKeyGroups);
// add it in the adequate expected set of timers per keygroup
@@ -298,7 +306,6 @@ public class HeapInternalTimerServiceTest {
assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L));
}
-
@Test
public void testCurrentProcessingTime() throws Exception {
@@ -673,7 +680,6 @@ public class HeapInternalTimerServiceTest {
@SuppressWarnings("unchecked")
Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class);
-
TestKeyContext keyContext1 = new TestKeyContext();
TestKeyContext keyContext2 = new TestKeyContext();
@@ -696,7 +702,6 @@ public class HeapInternalTimerServiceTest {
subKeyGroupRange2,
maxParallelism);
-
processingTimeService1.setCurrentTime(10);
timerService1.advanceWatermark(10);
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index 32953fc..696acfa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.api.operators;
+package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
index f57eed1..5a7e69e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.util.concurrent.RunnableFuture;
@@ -29,6 +30,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;
+/**
+ * Tests for {@link OperatorSnapshotResult}.
+ */
public class OperatorSnapshotResultTest extends TestLogger {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index c37fe48..35ab00c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.api.operators;
+package org.apache.flink.streaming.api.operators;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -92,6 +93,7 @@ public class ProcessOperatorTest extends TestLogger {
testHarness.close();
}
+
private static class QueryingProcessFunction extends ProcessFunction<Integer, String> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 9b78b08..1e9b942 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.streaming.api.operators;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -43,17 +40,19 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.util.Collector;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.junit.Test;
import java.io.File;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
/**
* Various tests around the proper passing of state descriptors to the operators
* and their serialization.
*
- * The tests use an arbitrary generic type to validate the behavior.
+ * <p>The tests use an arbitrary generic type to validate the behavior.
*/
@SuppressWarnings("serial")
public class StateDescriptorPassingTest {
@@ -266,7 +265,7 @@ public class StateDescriptorPassingTest {
assertTrue(descr instanceof ListStateDescriptor);
- ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>)descr;
+ ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>) descr;
// this would be the first statement to fail if state descriptors were not properly initialized
TypeSerializer<?> serializer = listDescr.getSerializer();
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 3745031..1ba2e77 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.util.Preconditions;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -54,6 +55,9 @@ import java.util.Set;
import static org.mockito.Mockito.mock;
+/**
+ * Tests for {@link StateInitializationContextImpl}.
+ */
public class StateInitializationContextImplTest {
static final int NUM_HANDLES = 10;
@@ -67,7 +71,6 @@ public class StateInitializationContextImplTest {
@Before
public void setUp() throws Exception {
-
this.writtenKeyGroups = 0;
this.writtenOperatorStates = new HashSet<>();
@@ -204,7 +207,6 @@ public class StateInitializationContextImplTest {
int stopCount = NUM_HANDLES / 2;
boolean isClosed = false;
-
try {
for (KeyGroupStatePartitionStreamProvider stateStreamProvider
: initializationContext.getRawKeyedStateInputs()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 277ced5..099f1f9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.TestLogger;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +41,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;
+/**
+ * Tests for {@link StateSnapshotContextSynchronousImpl}.
+ */
public class StateSnapshotContextSynchronousImplTest extends TestLogger {
private StateSnapshotContextSynchronousImpl snapshotContext;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
index 047aad8..8add242 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.operators;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;
@@ -26,9 +24,12 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
+
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
/**
* Tests for {@link StreamFilter}. These test that:
*
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
index e4e29c1..8ea1e12 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.operators;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
@@ -27,9 +25,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
/**
* Tests for {@link StreamMap}. These test that:
*
@@ -41,7 +42,7 @@ import org.junit.Test;
*/
public class StreamFlatMapTest {
- public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
+ private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index 05a910f..3ebb9ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.operators;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichFoldFunction;
@@ -33,6 +31,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Test;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index b61c760..f1c9bca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.operators;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -31,9 +29,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
+
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
/**
* Tests for {@link StreamGroupedReduce}. These test that:
*