You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:22 UTC
[22/22] flink git commit: [FLINK-6731] [tests] Activate strict
checkstyle for flink-tests
[FLINK-6731] [tests] Activate strict checkstyle for flink-tests
This closes #4295
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bd491e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bd491e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bd491e0
Branch: refs/heads/master
Commit: 9bd491e05120915cbde36d4452e3982fe5d0975f
Parents: 480ccfb
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue May 30 15:40:47 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jul 12 18:37:47 2017 -0400
----------------------------------------------------------------------
.../api/java/batch/TableEnvironmentITCase.java | 2 +-
.../api/java/batch/sql/GroupingSetsITCase.java | 2 +-
.../table/api/java/batch/sql/SqlITCase.java | 2 +-
flink-tests/pom.xml | 35 +
.../accumulators/AccumulatorErrorITCase.java | 8 +-
.../test/accumulators/AccumulatorITCase.java | 72 +-
.../AccumulatorIterativeITCase.java | 21 +-
.../accumulators/AccumulatorLiveITCase.java | 54 +-
.../flink/test/actions/CountCollectITCase.java | 14 +-
.../broadcastvars/BroadcastBranchingITCase.java | 35 +-
.../broadcastvars/BroadcastUnionITCase.java | 14 +-
.../BroadcastVarInitializationITCase.java | 43 +-
.../test/cancelling/CancelingTestBase.java | 21 +-
.../test/cancelling/JoinCancelingITCase.java | 42 +-
.../test/cancelling/MapCancelingITCase.java | 28 +-
...tractEventTimeWindowCheckpointingITCase.java | 87 +-
...ckendEventTimeWindowCheckpointingITCase.java | 3 +
...ckendEventTimeWindowCheckpointingITCase.java | 3 +
.../CoStreamCheckpointingITCase.java | 31 +-
...ontinuousFileProcessingCheckpointITCase.java | 18 +-
.../EventTimeAllWindowCheckpointingITCase.java | 115 +-
...ckendEventTimeWindowCheckpointingITCase.java | 3 +
...ckendEventTimeWindowCheckpointingITCase.java | 3 +
.../KeyedStateCheckpointingITCase.java | 17 +-
...ckendEventTimeWindowCheckpointingITCase.java | 3 +
.../test/checkpointing/RescalingITCase.java | 46 +-
...ckendEventTimeWindowCheckpointingITCase.java | 3 +
.../test/checkpointing/SavepointITCase.java | 54 +-
.../checkpointing/StateCheckpointedITCase.java | 71 +-
.../StreamCheckpointNotifierITCase.java | 98 +-
.../StreamCheckpointingITCase.java | 46 +-
.../StreamFaultToleranceTestBase.java | 11 +-
.../TimestampedFileInputSplitTest.java | 5 +
.../UdfStreamOperatorCheckpointingITCase.java | 15 +-
.../WindowCheckpointingITCase.java | 49 +-
.../utils/SavepointMigrationTestBase.java | 22 +-
...atefulJobSavepointFrom11MigrationITCase.java | 50 +-
...atefulJobSavepointFrom12MigrationITCase.java | 52 +-
.../test/classloading/ClassLoaderITCase.java | 17 +-
.../jar/CheckpointedStreamingProgram.java | 24 +-
.../jar/CheckpointingCustomKvStateProgram.java | 8 +-
.../jar/CustomInputSplitProgram.java | 21 +-
.../classloading/jar/CustomKvStateProgram.java | 3 +-
.../test/classloading/jar/KMeansForTest.java | 41 +-
.../jar/LegacyCheckpointedStreamingProgram.java | 19 +-
.../jar/StreamingCustomInputSplitProgram.java | 15 +-
.../test/classloading/jar/StreamingProgram.java | 18 +-
.../test/classloading/jar/UserCodeType.java | 6 +-
.../clients/examples/JobRetrievalITCase.java | 140 --
.../clients/examples/LocalExecutorITCase.java | 76 -
.../completeness/TypeInfoTestCoverageTest.java | 9 +-
.../distributedCache/DistributedCacheTest.java | 123 --
.../distributedcache/DistributedCacheTest.java | 125 ++
.../test/example/client/JobRetrievalITCase.java | 143 ++
.../example/client/LocalExecutorITCase.java | 81 +
.../failing/JobSubmissionFailsITCase.java | 208 +++
.../test/example/failing/TaskFailureITCase.java | 98 ++
.../example/java/ConnectedComponentsITCase.java | 65 +
.../example/java/EnumTriangleBasicITCase.java | 52 +
.../flink/test/example/java/PageRankITCase.java | 95 +
.../example/java/TransitiveClosureITCase.java | 64 +
.../test/example/java/WebLogAnalysisITCase.java | 57 +
.../test/example/java/WordCountITCase.java | 51 +
.../example/java/WordCountNestedPOJOITCase.java | 129 ++
.../example/java/WordCountSimplePOJOITCase.java | 110 ++
.../WordCountSubclassInterfacePOJOITCase.java | 169 ++
.../java/WordCountSubclassPOJOITCase.java | 133 ++
.../java/WordCountWithCollectionITCase.java | 70 +
.../scala/ConnectedComponentsITCase.java | 65 +
.../test/example/scala/EnumTriangleITCase.java | 52 +
.../test/example/scala/PageRankITCase.java | 100 ++
.../example/scala/TransitiveClosureITCase.java | 64 +
.../example/scala/WebLogAnalysisITCase.java | 57 +
.../test/example/scala/WordCountITCase.java | 57 +
.../ConnectedComponentsITCase.java | 63 -
.../EnumTriangleBasicITCase.java | 48 -
.../exampleJavaPrograms/PageRankITCase.java | 92 -
.../TransitiveClosureITCase.java | 63 -
.../WebLogAnalysisITCase.java | 53 -
.../exampleJavaPrograms/WordCountITCase.java | 47 -
.../WordCountNestedPOJOITCase.java | 118 --
.../WordCountSimplePOJOITCase.java | 102 --
.../WordCountSubclassInterfacePOJOITCase.java | 152 --
.../WordCountSubclassPOJOITCase.java | 123 --
.../WordCountWithCollectionITCase.java | 66 -
.../ConnectedComponentsITCase.java | 64 -
.../EnumTriangleITCase.java | 48 -
.../exampleScalaPrograms/PageRankITCase.java | 95 -
.../TransitiveClosureITCase.java | 63 -
.../WebLogAnalysisITCase.java | 53 -
.../exampleScalaPrograms/WordCountITCase.java | 53 -
.../JobSubmissionFailsITCase.java | 204 ---
.../test/failingPrograms/TaskFailureITCase.java | 98 --
.../hadoop/mapred/HadoopIOFormatsITCase.java | 55 +-
.../hadoop/mapred/WordCountMapredITCase.java | 11 +-
.../mapreduce/WordCountMapreduceITCase.java | 11 +-
.../apache/flink/test/io/CsvReaderITCase.java | 11 +-
.../apache/flink/test/io/InputOutputITCase.java | 2 +-
.../flink/test/io/RichInputOutputITCase.java | 8 +-
.../BulkIterationWithAllReducerITCase.java | 43 +-
.../CoGroupConnectedComponentsITCase.java | 25 +-
.../CoGroupConnectedComponentsSecondITCase.java | 73 +-
.../iterative/ConnectedComponentsITCase.java | 19 +-
...ectedComponentsWithDeferredUpdateITCase.java | 34 +-
.../ConnectedComponentsWithObjectMapITCase.java | 34 +-
...tedComponentsWithSolutionSetFirstITCase.java | 19 +-
.../test/iterative/DanglingPageRankITCase.java | 83 +-
...terationNotDependingOnSolutionSetITCase.java | 22 +-
.../DependencyConnectedComponentsITCase.java | 184 +-
.../iterative/EmptyWorksetIterationITCase.java | 25 +-
.../test/iterative/IdentityIterationITCase.java | 23 +-
...nIncompleteDynamicPathConsumptionITCase.java | 39 +-
...onIncompleteStaticPathConsumptionITCase.java | 39 +-
...IterationTerminationWithTerminationTail.java | 10 +-
.../IterationTerminationWithTwoTails.java | 11 +-
.../IterationWithAllReducerITCase.java | 6 +-
.../iterative/IterationWithChainingITCase.java | 15 +-
.../iterative/IterationWithUnionITCase.java | 24 +-
.../iterative/KMeansWithBroadcastSetITCase.java | 7 +-
.../MultipleSolutionSetJoinsITCase.java | 37 +-
.../iterative/SolutionSetDuplicatesITCase.java | 6 +-
.../StaticlyNestedIterationsITCase.java | 35 +-
.../UnionStaticDynamicIterationITCase.java | 27 +-
.../AggregatorConvergenceITCase.java | 294 ++--
.../aggregators/AggregatorsITCase.java | 67 +-
.../test/javaApiOperators/AggregateITCase.java | 180 --
.../CoGroupGroupSortITCase.java | 122 --
.../test/javaApiOperators/CoGroupITCase.java | 992 -----------
.../test/javaApiOperators/CrossITCase.java | 456 -----
.../CustomDistributionITCase.java | 359 ----
.../test/javaApiOperators/DataSinkITCase.java | 355 ----
.../test/javaApiOperators/DataSourceITCase.java | 81 -
.../test/javaApiOperators/DistinctITCase.java | 318 ----
.../ExecutionEnvironmentITCase.java | 95 -
.../test/javaApiOperators/FilterITCase.java | 327 ----
.../test/javaApiOperators/FirstNITCase.java | 151 --
.../test/javaApiOperators/FlatMapITCase.java | 360 ----
.../javaApiOperators/GroupCombineITCase.java | 482 ------
.../javaApiOperators/GroupReduceITCase.java | 1636 ------------------
.../flink/test/javaApiOperators/JoinITCase.java | 938 ----------
.../flink/test/javaApiOperators/MapITCase.java | 514 ------
.../javaApiOperators/MapPartitionITCase.java | 101 --
.../javaApiOperators/ObjectReuseITCase.java | 216 ---
.../test/javaApiOperators/OuterJoinITCase.java | 680 --------
.../test/javaApiOperators/PartitionITCase.java | 848 ---------
.../test/javaApiOperators/ProjectITCase.java | 64 -
.../test/javaApiOperators/ReduceITCase.java | 512 ------
.../ReduceWithCombinerITCase.java | 313 ----
.../RemoteEnvironmentITCase.java | 153 --
.../ReplicatingDataSourceITCase.java | 121 --
.../test/javaApiOperators/SampleITCase.java | 167 --
.../javaApiOperators/SortPartitionITCase.java | 343 ----
.../test/javaApiOperators/SumMinMaxITCase.java | 103 --
.../test/javaApiOperators/TypeHintITCase.java | 326 ----
.../test/javaApiOperators/UnionITCase.java | 132 --
.../util/CollectionDataSets.java | 725 --------
.../util/ValueCollectionDataSets.java | 730 --------
.../test/manual/CheckForbiddenMethodsUsage.java | 7 +-
.../HashTableRecordWidthCombinations.java | 19 +-
.../flink/test/manual/MassiveStringSorting.java | 161 +-
.../test/manual/MassiveStringValueSorting.java | 166 +-
.../test/manual/NotSoMiniClusterIterations.java | 8 +-
.../flink/test/manual/OverwriteObjects.java | 7 +-
.../flink/test/manual/ReducePerformance.java | 7 +-
.../manual/StreamingScalabilityAndLatency.java | 48 +-
.../apache/flink/test/manual/package-info.java | 3 +-
.../flink/test/misc/AutoParallelismITCase.java | 8 +-
.../test/misc/CustomPartitioningITCase.java | 22 +-
.../test/misc/CustomSerializationITCase.java | 30 +-
.../flink/test/misc/GenericTypeInfoTest.java | 13 +-
.../test/misc/MiscellaneousIssuesITCase.java | 32 +-
...SuccessAfterNetworkBuffersFailureITCase.java | 26 +-
.../flink/test/operators/AggregateITCase.java | 183 ++
.../test/operators/CoGroupGroupSortITCase.java | 125 ++
.../flink/test/operators/CoGroupITCase.java | 989 +++++++++++
.../flink/test/operators/CrossITCase.java | 457 +++++
.../operators/CustomDistributionITCase.java | 362 ++++
.../flink/test/operators/DataSinkITCase.java | 356 ++++
.../flink/test/operators/DataSourceITCase.java | 82 +
.../flink/test/operators/DistinctITCase.java | 322 ++++
.../operators/ExecutionEnvironmentITCase.java | 95 +
.../flink/test/operators/FilterITCase.java | 331 ++++
.../flink/test/operators/FirstNITCase.java | 156 ++
.../flink/test/operators/FlatMapITCase.java | 364 ++++
.../test/operators/GroupCombineITCase.java | 484 ++++++
.../flink/test/operators/GroupReduceITCase.java | 1633 +++++++++++++++++
.../apache/flink/test/operators/JoinITCase.java | 945 ++++++++++
.../apache/flink/test/operators/MapITCase.java | 518 ++++++
.../test/operators/MapPartitionITCase.java | 101 ++
.../flink/test/operators/ObjectReuseITCase.java | 215 +++
.../flink/test/operators/OuterJoinITCase.java | 682 ++++++++
.../flink/test/operators/PartitionITCase.java | 847 +++++++++
.../flink/test/operators/ProjectITCase.java | 67 +
.../flink/test/operators/ReduceITCase.java | 515 ++++++
.../operators/ReduceWithCombinerITCase.java | 317 ++++
.../test/operators/RemoteEnvironmentITCase.java | 157 ++
.../operators/ReplicatingDataSourceITCase.java | 118 ++
.../flink/test/operators/SampleITCase.java | 171 ++
.../test/operators/SortPartitionITCase.java | 347 ++++
.../flink/test/operators/SumMinMaxITCase.java | 108 ++
.../flink/test/operators/TypeHintITCase.java | 330 ++++
.../flink/test/operators/UnionITCase.java | 136 ++
.../test/operators/util/CollectionDataSets.java | 772 +++++++++
.../operators/util/ValueCollectionDataSets.java | 775 +++++++++
.../examples/KMeansSingleStepTest.java | 80 +-
.../examples/RelationalQueryCompilerTest.java | 156 +-
.../examples/WordCountCompilerTest.java | 20 +-
.../ConnectedComponentsCoGroupTest.java | 64 +-
...ultipleJoinsWithSolutionSetCompilerTest.java | 82 +-
.../iterations/PageRankCompilerTest.java | 63 +-
.../jsonplan/DumpCompiledPlanTest.java | 39 +-
.../jsonplan/JsonJobGraphGenerationTest.java | 87 +-
.../optimizer/jsonplan/PreviewPlanDumpTest.java | 39 +-
.../query/AbstractQueryableStateITCase.java | 52 +-
.../KVStateRequestSerializerRocksDBTest.java | 15 +-
.../query/QueryableStateITCaseFsBackend.java | 1 +
.../QueryableStateITCaseRocksDBBackend.java | 1 +
...ctTaskManagerProcessFailureRecoveryTest.java | 33 +-
.../flink/test/recovery/FastFailuresITCase.java | 15 +-
.../JobManagerHACheckpointRecoveryITCase.java | 72 +-
.../JobManagerHAJobGraphRecoveryITCase.java | 34 +-
...agerHAProcessFailureBatchRecoveryITCase.java | 24 +-
.../recovery/ProcessFailureCancelingITCase.java | 60 +-
...SimpleRecoveryFailureRateStrategyITBase.java | 4 +
...RecoveryFixedDelayRestartStrategyITBase.java | 4 +
.../test/recovery/SimpleRecoveryITCaseBase.java | 22 +-
.../TaskManagerFailureRecoveryITCase.java | 38 +-
...anagerProcessFailureBatchRecoveryITCase.java | 7 +-
...erProcessFailureStreamingRecoveryITCase.java | 24 +-
...ConsumePipelinedAndBlockingResultITCase.java | 5 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 36 +-
.../flink/test/runtime/JoinDeadlockITCase.java | 3 +-
.../runtime/NetworkStackThroughputITCase.java | 15 +-
.../RegisterTypeWithKryoSerializerITCase.java | 16 +-
.../test/runtime/SelfJoinDeadlockITCase.java | 3 +-
.../ZooKeeperLeaderElectionITCase.java | 19 +-
.../LocalFlinkMiniClusterITCase.java | 43 +-
.../test/state/ManualWindowSpeedITCase.java | 4 +-
.../state/StateHandleSerializationTest.java | 20 +-
.../AbstractOperatorRestoreTestBase.java | 21 +-
.../state/operator/restore/ExecutionMode.java | 3 +-
.../restore/keyed/KeyedComplexChainTest.java | 4 +
.../state/operator/restore/keyed/KeyedJob.java | 4 +-
...AbstractNonKeyedOperatorRestoreTestBase.java | 1 +
.../restore/unkeyed/ChainBreakTest.java | 1 +
.../unkeyed/ChainLengthDecreaseTest.java | 1 +
.../unkeyed/ChainLengthIncreaseTest.java | 1 +
.../restore/unkeyed/ChainOrderTest.java | 1 +
.../restore/unkeyed/ChainUnionTest.java | 1 +
.../operator/restore/unkeyed/NonKeyedJob.java | 4 +-
.../streaming/api/StreamingOperatorsITCase.java | 31 +-
.../api/outputformat/CsvOutputFormatITCase.java | 3 +
.../outputformat/TextOutputFormatITCase.java | 3 +
.../runtime/ChainedRuntimeContextITCase.java | 3 +
.../streaming/runtime/CoGroupJoinITCase.java | 18 +-
.../test/streaming/runtime/CoStreamITCase.java | 8 +-
.../streaming/runtime/DataStreamPojoITCase.java | 65 +-
.../streaming/runtime/DirectedOutputITCase.java | 3 +
.../test/streaming/runtime/IterateITCase.java | 115 +-
.../streaming/runtime/OutputSplitterITCase.java | 3 +
.../streaming/runtime/PartitionerITCase.java | 2 -
.../streaming/runtime/SelfConnectionITCase.java | 4 +-
.../streaming/runtime/SideOutputITCase.java | 25 +-
.../streaming/runtime/StateBackendITCase.java | 9 +-
.../runtime/StreamTaskTimerITCase.java | 22 +-
.../test/streaming/runtime/TimestampITCase.java | 156 +-
.../streaming/runtime/WindowFoldITCase.java | 3 +-
.../runtime/util/EvenOddOutputSelector.java | 4 +
.../test/streaming/runtime/util/NoOpIntMap.java | 4 +
.../runtime/util/ReceiveCheckNoOpSink.java | 6 +
.../runtime/util/TestListResultSink.java | 9 +-
.../streaming/runtime/util/TestListWrapper.java | 5 +-
.../flink/test/testfunctions/Tokenizer.java | 3 +
.../PojoSerializerUpgradeTest.java | 23 +-
.../org/apache/flink/test/util/CoordVector.java | 24 +-
.../flink/test/util/DataSetUtilsITCase.java | 36 +-
.../test/util/InfiniteIntegerInputFormat.java | 7 +-
.../util/InfiniteIntegerTupleInputFormat.java | 7 +-
.../apache/flink/test/util/PointFormatter.java | 6 +-
.../apache/flink/test/util/PointInFormat.java | 8 +-
.../org/apache/flink/test/util/TestUtils.java | 7 +-
.../UniformIntTupleGeneratorInputFormat.java | 8 +-
.../sessionwindows/EventGenerator.java | 2 +-
.../sessionwindows/EventGeneratorFactory.java | 3 +-
.../sessionwindows/GeneratorConfiguration.java | 22 +-
.../sessionwindows/GeneratorEventFactory.java | 15 +-
.../sessionwindows/LongRandomGenerator.java | 7 +-
.../ParallelSessionsEventGenerator.java | 14 +-
.../sessionwindows/SessionConfiguration.java | 4 +-
.../windowing/sessionwindows/SessionEvent.java | 2 +-
.../SessionEventGeneratorImpl.java | 12 +-
.../SessionGeneratorConfiguration.java | 2 +-
.../sessionwindows/SessionWindowITCase.java | 12 +-
.../sessionwindows/TestEventPayload.java | 4 +-
.../scala/operators/GroupCombineITCase.scala | 2 +-
295 files changed, 17887 insertions(+), 17270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index aac7e11..0def39a 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestB
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.table.calcite.CalciteConfig;
import org.apache.flink.table.calcite.CalciteConfigBuilder;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.types.Row;
import org.apache.calcite.tools.RuleSets;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
index 6c1a753..29fbdf5 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index f4e5daf..6a72b3e 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.types.Row;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index beac803..3f67a88 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -338,6 +338,41 @@ under the License.
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<!-- Scala Code Style, most of the configuration done via plugin management -->
<plugin>
<groupId>org.scalastyle</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 25d9228..4de1602 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -32,18 +31,18 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-
import static org.junit.Assert.fail;
/**
- * Tests cases where Accumulator are
+ * Tests cases where accumulators:
* a) throw errors during runtime
- * b) is not compatible with existing accumulator
+ * b) are not compatible with existing accumulator.
*/
public class AccumulatorErrorITCase extends TestLogger {
@@ -91,7 +90,6 @@ public class AccumulatorErrorITCase extends TestLogger {
}
}
-
@Test
public void testInvalidTypeAccumulator() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index 5f2b0a9..3e35bd0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -18,10 +18,6 @@
package org.apache.flink.test.accumulators;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -38,16 +34,20 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
-import org.junit.Assert;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.junit.Assert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
/**
* Test for the basic functionality of accumulators. We cannot test all different
* kinds of plans here (iterative, etc.).
- *
- * TODO Test conflict when different UDFs write to accumulator with same name
+ *
+ * <p>TODO Test conflict when different UDFs write to accumulator with same name
* but with different type. The conflict will occur in JobManager while merging.
*/
@SuppressWarnings("serial")
@@ -60,31 +60,31 @@ public class AccumulatorITCase extends JavaProgramTestBase {
private String resultPath;
private JobExecutionResult result;
-
+
@Override
protected void preSubmit() throws Exception {
dataPath = createTempFile("datapoints.txt", INPUT);
resultPath = getTempFilePath("result");
}
-
+
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
-
+
// Test accumulator results
System.out.println("Accumulator results:");
JobExecutionResult res = this.result;
System.out.println(AccumulatorHelper.getResultsFormatted(res.getAllAccumulatorResults()));
- Assert.assertEquals(Integer.valueOf(3), (Integer) res.getAccumulatorResult("num-lines"));
+ Assert.assertEquals(Integer.valueOf(3), res.getAccumulatorResult("num-lines"));
+
+ Assert.assertEquals(Double.valueOf(getParallelism()), res.getAccumulatorResult("open-close-counter"));
- Assert.assertEquals(Double.valueOf(getParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
-
// Test histogram (words per line distribution)
Map<Integer, Integer> dist = Maps.newHashMap();
dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
-
+
// Test distinct words (custom accumulator)
Set<StringValue> distinctWords = Sets.newHashSet();
distinctWords.add(new StringValue("one"));
@@ -96,18 +96,18 @@ public class AccumulatorITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> input = env.readTextFile(dataPath);
-
+
+ DataSet<String> input = env.readTextFile(dataPath);
+
input.flatMap(new TokenizeLine())
.groupBy(0)
.reduceGroup(new CountWords())
.writeAsCsv(resultPath, "\n", " ");
-
+
this.result = env.execute();
}
-
- public static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
+
+ private static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
// Needs to be instantiated later since the runtime context is not yet
// initialized at this place
@@ -120,7 +120,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
@Override
public void open(Configuration parameters) {
-
+
// Add counters using convenience functions
this.cntNumLines = getRuntimeContext().getIntCounter("num-lines");
this.wordsPerLineDistribution = getRuntimeContext().getHistogram("words-per-line");
@@ -157,20 +157,20 @@ public class AccumulatorITCase extends JavaProgramTestBase {
// Test counter used in open() and closed()
this.openCloseCounter.add(0.5);
}
-
+
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
this.cntNumLines.add(1);
int wordsPerLine = 0;
-
+
for (String token : value.toLowerCase().split("\\W+")) {
distinctWords.add(new StringValue(token));
out.collect(new Tuple2<>(token, 1));
- ++ wordsPerLine;
+ ++wordsPerLine;
}
wordsPerLineDistribution.add(wordsPerLine);
}
-
+
@Override
public void close() throws Exception {
// Test counter used in open and close only
@@ -179,37 +179,35 @@ public class AccumulatorITCase extends JavaProgramTestBase {
}
}
-
- public static class CountWords
+ private static class CountWords
extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
- implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
- {
-
+ implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
private IntCounter reduceCalls;
private IntCounter combineCalls;
-
+
@Override
public void open(Configuration parameters) {
this.reduceCalls = getRuntimeContext().getIntCounter("reduce-calls");
this.combineCalls = getRuntimeContext().getIntCounter("combine-calls");
}
-
+
@Override
public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
reduceCalls.add(1);
reduceInternal(values, out);
}
-
+
@Override
public void combine(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
combineCalls.add(1);
reduceInternal(values, out);
}
-
+
private void reduceInternal(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
int sum = 0;
String key = null;
-
+
for (Tuple2<String, Integer> e : values) {
key = e.f0;
sum += e.f1;
@@ -217,9 +215,9 @@ public class AccumulatorITCase extends JavaProgramTestBase {
out.collect(new Tuple2<>(key, sum));
}
}
-
+
/**
- * Custom accumulator
+ * Custom accumulator.
*/
public static class SetAccumulator<T> implements Accumulator<T, HashSet<T>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index d86d517..3e19ce8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.accumulators;
import org.apache.flink.api.common.accumulators.IntCounter;
@@ -27,13 +26,17 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
-public class AccumulatorIterativeITCase extends JavaProgramTestBase {
+/**
+ * Test accumulator within iteration.
+ */
+public class AccumulatorIterativeITCase extends JavaProgramTestBase {
private static final int NUM_ITERATIONS = 3;
private static final int NUM_SUBTASKS = 1;
private static final String ACC_NAME = "test";
-
+
@Override
protected boolean skipCollectionExecution() {
return true;
@@ -43,20 +46,20 @@ public class AccumulatorIterativeITCase extends JavaProgramTestBase {
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_SUBTASKS);
-
+
IterativeDataSet<Integer> iteration = env.fromElements(1, 2, 3).iterate(NUM_ITERATIONS);
-
+
iteration.closeWith(iteration.reduceGroup(new SumReducer())).output(new DiscardingOutputFormat<Integer>());
Assert.assertEquals(NUM_ITERATIONS * 6, (int) env.execute().getAccumulatorResult(ACC_NAME));
}
-
+
static final class SumReducer extends RichGroupReduceFunction<Integer, Integer> {
-
+
private static final long serialVersionUID = 1L;
-
+
private IntCounter testCounter = new IntCounter();
-
+
@Override
public void open(Configuration config) throws Exception {
getRuntimeContext().addAccumulator(ACC_NAME, this.testCounter);
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 92e5768..756b81e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -18,12 +18,6 @@
package org.apache.flink.test.accumulators;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
@@ -55,39 +49,42 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
-
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import static org.junit.Assert.fail;
/**
* Tests the availability of accumulator results during runtime. The test case tests a user-defined
* accumulator and Flink's internal accumulators for two consecutive tasks.
*
- * CHAINED[Source -> Map] -> Sink
+ * <p>CHAINED[Source -> Map] -> Sink
*
- * Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
+ * <p>Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
* the task to the task manager which notifies the job manager and sends the current accumulators.
* The task blocks until the test has been notified about the current accumulator values.
*
- * A barrier between the operators ensures that that pipelining is disabled for the streaming test.
+ * <p>A barrier between the operators ensures that that pipelining is disabled for the streaming test.
* The batch job reads the records one at a time. The streaming code buffers the records beforehand;
* that's why exact guarantees about the number of records read are very hard to make. Thus, why we
* check for an upper bound of the elements read.
@@ -104,7 +101,7 @@ public class AccumulatorLiveITCase extends TestLogger {
private static JobGraph jobGraph;
// name of user accumulator
- private static String ACCUMULATOR_NAME = "test";
+ private static final String ACCUMULATOR_NAME = "test";
// number of heartbeat intervals to check
private static final int NUM_ITERATIONS = 5;
@@ -113,7 +110,6 @@ public class AccumulatorLiveITCase extends TestLogger {
private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-
@Before
public void before() throws Exception {
system = AkkaUtils.createLocalActorSystem(new Configuration());
@@ -129,8 +125,8 @@ public class AccumulatorLiveITCase extends TestLogger {
taskManager = testingCluster.getTaskManagersAsJava().get(0);
// generate test data
- for (int i=0; i < NUM_ITERATIONS; i++) {
- inputData.add(i, String.valueOf(i+1));
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ inputData.add(i, String.valueOf(i + 1));
}
NotifyingMapper.finished = false;
@@ -163,7 +159,6 @@ public class AccumulatorLiveITCase extends TestLogger {
verifyResults();
}
-
@Test
public void testStreaming() throws Exception {
@@ -175,14 +170,12 @@ public class AccumulatorLiveITCase extends TestLogger {
.flatMap(new NotifyingMapper())
.writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
-
jobGraph = env.getStreamGraph().getJobGraph();
jobID = jobGraph.getJobID();
verifyResults();
}
-
private static void verifyResults() {
new JavaTestKit(system) {{
@@ -201,7 +194,6 @@ public class AccumulatorLiveITCase extends TestLogger {
selfGateway);
expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
-
TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
@@ -210,7 +202,7 @@ public class AccumulatorLiveITCase extends TestLogger {
ExecutionAttemptID sinkTaskID = null;
/* Check for accumulator values */
- if(checkUserAccumulators(0, userAccumulators)) {
+ if (checkUserAccumulators(0, userAccumulators)) {
LOG.info("Passed initial check for map task.");
} else {
fail("Wrong accumulator results when map task begins execution.");
@@ -244,7 +236,7 @@ public class AccumulatorLiveITCase extends TestLogger {
msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
userAccumulators = msg.userAccumulators();
- if(checkUserAccumulators(expectedAccVal, userAccumulators)) {
+ if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
LOG.info("Passed initial check for sink task.");
} else {
fail("Wrong accumulator results when sink task begins execution.");
@@ -272,14 +264,13 @@ public class AccumulatorLiveITCase extends TestLogger {
}};
}
-
- private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) {
+ private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?, ?>> accumulatorMap) {
LOG.info("checking user accumulators");
- return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
+ return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter) accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
}
/**
- * UDF that notifies when it changes the accumulator values
+ * UDF that notifies when it changes the accumulator values.
*/
private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
private static final long serialVersionUID = 1L;
@@ -356,7 +347,7 @@ public class AccumulatorLiveITCase extends TestLogger {
}
/**
- * Helpers to generate the JobGraph
+ * Helpers to generate the JobGraph.
*/
private static JobGraph getOptimizedPlan(Plan plan) {
Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
@@ -376,7 +367,6 @@ public class AccumulatorLiveITCase extends TestLogger {
}
}
-
/**
* This is used to for creating the example topology. {@link #execute} is never called, we
* only use this to call {@link #getStreamGraph()}.
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
index e742c27..737bdfb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
@@ -21,17 +21,18 @@ package org.apache.flink.test.actions;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import static org.junit.Assert.*;
-
import org.apache.flink.test.util.MultipleProgramsTestBase;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
/**
* Tests the methods that bring elements back to the client driver program.
*/
@@ -64,7 +65,6 @@ public class CountCollectITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableObjectReuse();
-
DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
index 5f2950c..720784e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.test.broadcastvars;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -35,6 +32,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test broadcast input after branching.
+ */
public class BroadcastBranchingITCase extends JavaProgramTestBase {
private static final String RESULT = "(2,112)\n";
@@ -63,16 +66,16 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
.fromElements(new Tuple2<>("1", 2), new Tuple2<>("2", 3), new Tuple2<>("3", 7));
// Jn1 matches x and y values on id and emits (id, x, y) triples
- JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1
- = sc2.join(sc3).where(0).equalTo(0).with(new Jn1());
+ JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1 =
+ sc2.join(sc3).where(0).equalTo(0).with(new Jn1());
// Jn2 matches polynomial and arguments by id, computes p = min(P(x),P(y)) and emits (id, p) tuples
- JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2
- = jn1.join(sc1).where(0).equalTo(0).with(new Jn2());
+ JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2 =
+ jn1.join(sc1).where(0).equalTo(0).with(new Jn2());
// Mp1 selects (id, x, y) triples where x = y and broadcasts z (=x=y) to Mp2
- FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1
- = jn1.flatMap(new Mp1());
+ FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1 =
+ jn1.flatMap(new Mp1());
// Mp2 filters out all p values which can be divided by z
List<Tuple2<String, Integer>> result = jn2.flatMap(new Mp2()).withBroadcastSet(mp1, "z").collect();
@@ -80,7 +83,7 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
JavaProgramTestBase.compareResultAsText(result, RESULT);
}
- public static class Jn1 implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
+ private static class Jn1 implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L;
@Override
@@ -89,7 +92,7 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
}
}
- public static class Jn2 implements JoinFunction<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> {
+ private static class Jn2 implements JoinFunction<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
private static int p(int x, int a, int b, int c) {
@@ -104,14 +107,14 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
int b = second.f2;
int c = second.f3;
- int p_x = p(x, a, b, c);
- int p_y = p(y, a, b, c);
- int min = Math.min(p_x, p_y);
+ int pX = p(x, a, b, c);
+ int pY = p(y, a, b, c);
+ int min = Math.min(pX, pY);
return new Tuple2<>(first.f0, min);
}
}
- public static class Mp1 implements FlatMapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
+ private static class Mp1 implements FlatMapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
@@ -122,7 +125,7 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
}
}
- public static class Mp2 extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+ private static class Mp2 extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
private Collection<Tuple2<String, Integer>> zs;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
index 113a330..79b0033 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
@@ -15,17 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.test.broadcastvars;
-import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
+
import org.junit.Assert;
+import java.util.List;
+
+/**
+ * Test broadcast input after union.
+ */
public class BroadcastUnionITCase extends JavaProgramTestBase {
private static final String BC_NAME = "bc";
@@ -43,11 +49,11 @@ public class BroadcastUnionITCase extends JavaProgramTestBase {
.withBroadcastSet(bc1.union(bc2), BC_NAME)
.reduce(new Reducer())
.collect();
-
+
Assert.assertEquals(Long.valueOf(3025), result.get(0));
}
- public static class Mapper extends RichMapFunction<Long, Long> {
+ private static class Mapper extends RichMapFunction<Long, Long> {
private List<Long> values;
@Override
@@ -65,7 +71,7 @@ public class BroadcastUnionITCase extends JavaProgramTestBase {
}
}
- public static class Reducer implements ReduceFunction<Long> {
+ private static class Reducer implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
index 41d24b8..26f5763 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
@@ -18,10 +18,6 @@
package org.apache.flink.test.broadcastvars;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.DataSet;
@@ -31,36 +27,43 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Test {@link BroadcastVariableInitializer}.
+ */
@SuppressWarnings("serial")
public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
-
+
@Override
protected void testProgram() throws Exception {
-
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
-
+
DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
-
+
IterativeDataSet<Integer> iteration = data.iterate(10);
-
+
DataSet<Integer> result = data.reduceGroup(new PickOneAllReduce()).withBroadcastSet(iteration, "bc");
-
+
final List<Integer> resultList = new ArrayList<Integer>();
iteration.closeWith(result).output(new LocalCollectionOutputFormat<Integer>(resultList));
-
+
env.execute();
-
+
Assert.assertEquals(8, resultList.get(0).intValue());
}
-
- public static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
-
+ private static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
+
private Integer bcValue;
-
+
@Override
public void open(Configuration parameters) {
this.bcValue = getRuntimeContext().getBroadcastVariableWithInitializer("bc", new PickFirstInitializer());
@@ -72,8 +75,8 @@ public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
return;
}
final int x = bcValue;
-
- for (Integer y : records) {
+
+ for (Integer y : records) {
if (y > x) {
out.collect(y);
return;
@@ -83,8 +86,8 @@ public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
out.collect(bcValue);
}
}
-
- public static class PickFirstInitializer implements BroadcastVariableInitializer<Integer, Integer> {
+
+ private static class PickFirstInitializer implements BroadcastVariableInitializer<Integer, Integer> {
@Override
public Integer initializeBroadcastVariable(Iterable<Integer> data) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 2767312..88c921d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.cancelling;
import org.apache.flink.api.common.Plan;
@@ -35,31 +34,33 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.util.TestLogger;
+
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-
import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
/**
- *
+ * Base class for testing job cancellation.
*/
public abstract class CancelingTestBase extends TestLogger {
-
+
private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
private static final int MINIMUM_HEAP_SIZE_MB = 192;
-
+
/**
* Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
* is canceled), starting from the point in time when the cancel request is issued.
@@ -69,13 +70,13 @@ public abstract class CancelingTestBase extends TestLogger {
private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
// --------------------------------------------------------------------------------------------
-
+
protected LocalFlinkMiniCluster executor;
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
-
+
// --------------------------------------------------------------------------------------------
-
+
private void verifyJvmOptions() {
final long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
@@ -112,7 +113,7 @@ public abstract class CancelingTestBase extends TestLogger {
public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
}
-
+
public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
try {
// submit job
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index d797e47..5e21129 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.cancelling;
import org.apache.flink.api.common.functions.JoinFunction;
@@ -31,13 +30,16 @@ import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
+/**
+ * Test job cancellation from within a JoinFunction.
+ */
public class JoinCancelingITCase extends CancelingTestBase {
private static final int parallelism = 4;
public JoinCancelingITCase() {
setTaskManagerNumSlots(parallelism);
}
-
+
// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
executeTask(joiner, slow, parallelism);
@@ -68,7 +70,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
executeTask(new SimpleMatcher<Integer>(), false);
}
-
+
// @Test
public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
executeTask(new StuckInOpenMatcher<Integer>(), false);
@@ -92,45 +94,45 @@ public class JoinCancelingITCase extends CancelingTestBase {
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
-
+
// @Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
executeTaskWithGenerator(new SimpleMatcher<Integer>(), 50000, 100, 30 * 1000, 30 * 1000);
}
// --------------- Test Sort Matches that are canceled while in the Matching Phase -----------------
-
+
// @Test
public void testCancelSortMatchWhileJoining() throws Exception {
executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10 * 1000, 20 * 1000);
}
-
+
// @Test
public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
executeTaskWithGenerator(new LongCancelTimeMatcher<Integer>(), 500, 3, 10 * 1000, 10 * 1000);
}
// -------------------------------------- Test System corner cases ---------------------------------
-
+
// @Test
public void testCancelSortMatchWithHighparallelism() throws Exception {
executeTask(new SimpleMatcher<Integer>(), false, 64);
}
// --------------------------------------------------------------------------------------------
-
- public static final class SimpleMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+ private static final class SimpleMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
private static final long serialVersionUID = 1L;
-
+
@Override
public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
return new Tuple2<>(first.f0, second.f0);
}
}
-
- public static final class DelayingMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+ private static final class DelayingMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
private static final long serialVersionUID = 1L;
-
+
private static final int WAIT_TIME_PER_RECORD = 10 * 1000; // 10 sec.
@Override
@@ -139,12 +141,12 @@ public class JoinCancelingITCase extends CancelingTestBase {
return new Tuple2<>(first.f0, second.f0);
}
}
-
- public static final class LongCancelTimeMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+ private static final class LongCancelTimeMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
private static final long serialVersionUID = 1L;
-
+
private static final int WAIT_TIME_PER_RECORD = 5 * 1000; // 5 sec.
-
+
@Override
public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
final long start = System.currentTimeMillis();
@@ -157,10 +159,10 @@ public class JoinCancelingITCase extends CancelingTestBase {
return new Tuple2<>(first.f0, second.f0);
}
}
-
- public static final class StuckInOpenMatcher<IN> extends RichJoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+ private static final class StuckInOpenMatcher<IN> extends RichJoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
private static final long serialVersionUID = 1L;
-
+
@Override
public void open(Configuration parameters) throws Exception {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 03b1a24..3a7039f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -24,30 +24,34 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.InfiniteIntegerInputFormat;
+
import org.junit.Test;
+/**
+ * Test job cancellation from within a MapFunction.
+ */
public class MapCancelingITCase extends CancelingTestBase {
private static final int parallelism = 4;
public MapCancelingITCase() {
setTaskManagerNumSlots(parallelism);
}
-
+
@Test
public void testMapCancelling() throws Exception {
executeTask(new IdentityMapper<Integer>());
}
-
+
@Test
public void testSlowMapCancelling() throws Exception {
executeTask(new DelayingIdentityMapper<Integer>());
}
-
+
@Test
public void testMapWithLongCancellingResponse() throws Exception {
executeTask(new LongCancelTimeIdentityMapper<Integer>());
}
-
+
@Test
public void testMapPriorToFirstRecordReading() throws Exception {
executeTask(new StuckInOpenIdentityMapper<Integer>());
@@ -67,8 +71,8 @@ public class MapCancelingITCase extends CancelingTestBase {
}
// --------------------------------------------------------------------------------------------
-
- public static final class IdentityMapper<IN> implements MapFunction<IN, IN> {
+
+ private static final class IdentityMapper<IN> implements MapFunction<IN, IN> {
private static final long serialVersionUID = 1L;
@Override
@@ -76,8 +80,8 @@ public class MapCancelingITCase extends CancelingTestBase {
return value;
}
}
-
- public static final class DelayingIdentityMapper<IN> implements MapFunction<IN, IN> {
+
+ private static final class DelayingIdentityMapper<IN> implements MapFunction<IN, IN> {
private static final long serialVersionUID = 1L;
private static final int WAIT_TIME_PER_VALUE = 10 * 1000; // 10 sec.
@@ -88,8 +92,8 @@ public class MapCancelingITCase extends CancelingTestBase {
return value;
}
}
-
- public static final class LongCancelTimeIdentityMapper<IN> implements MapFunction<IN, IN> {
+
+ private static final class LongCancelTimeIdentityMapper<IN> implements MapFunction<IN, IN> {
private static final long serialVersionUID = 1L;
private static final int WAIT_TIME_PER_VALUE = 5 * 1000; // 5 sec.
@@ -108,8 +112,8 @@ public class MapCancelingITCase extends CancelingTestBase {
return value;
}
}
-
- public static final class StuckInOpenIdentityMapper<IN> extends RichMapFunction<IN, IN> {
+
+ private static final class StuckInOpenIdentityMapper<IN> extends RichMapFunction<IN, IN> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 0021b81..269b126 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -169,11 +170,11 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
- final int WINDOW_SIZE = windowSize();
- final int NUM_KEYS = numKeys();
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
FailingSource.reset();
-
+
try {
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -183,10 +184,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
env.setStateBackend(this.stateBackend);
env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
.rebalance()
.keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
private boolean open = false;
@@ -217,8 +218,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -239,9 +239,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
- final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
- final int WINDOW_SIZE = windowSize();
- final int NUM_KEYS = numKeys();
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
FailingSource.reset();
try {
@@ -254,10 +254,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
env.setStateBackend(this.stateBackend);
env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
.rebalance()
.keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
private boolean open = false;
@@ -292,8 +292,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
}
})
- .addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+ .addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -305,10 +304,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
- final int WINDOW_SIZE = windowSize();
- final int WINDOW_SLIDE = windowSlide();
- final int NUM_KEYS = numKeys();
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int windowSlide = windowSlide();
+ final int numKeys = numKeys();
FailingSource.reset();
try {
@@ -322,10 +321,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
env.getConfig().setUseSnapshotCompression(true);
env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
.rebalance()
.keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
private boolean open = false;
@@ -356,8 +355,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -369,9 +367,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testPreAggregatedTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
- final int WINDOW_SIZE = windowSize();
- final int NUM_KEYS = numKeys();
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
FailingSource.reset();
try {
@@ -383,10 +381,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
env.setStateBackend(this.stateBackend);
env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
.rebalance()
.keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
.reduce(
new ReduceFunction<Tuple2<Long, IntType>>() {
@@ -425,8 +423,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -438,10 +435,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
@Test
public void testPreAggregatedSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
- final int WINDOW_SIZE = windowSize();
- final int WINDOW_SLIDE = windowSlide();
- final int NUM_KEYS = numKeys();
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int windowSlide = windowSlide();
+ final int numKeys = numKeys();
FailingSource.reset();
try {
@@ -453,10 +450,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
env.setStateBackend(this.stateBackend);
env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
.rebalance()
.keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
.reduce(
new ReduceFunction<Tuple2<Long, IntType>>() {
@@ -497,8 +494,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -508,14 +504,12 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
}
-
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener
- {
+ implements ListCheckpointed<Integer>, CheckpointListener {
private static volatile boolean failedBefore = false;
private final int numKeys;
@@ -556,8 +550,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements))
- {
+ (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
// the function failed before, or we are in the elements before the failure
synchronized (ctx.getCheckpointLock()) {
int next = numElementsEmitted++;
@@ -669,7 +662,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
Integer curr = windowCounts.get(value.f0);
if (curr != null) {
windowCounts.put(value.f0, curr + 1);
@@ -756,7 +748,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
windowCounts.put(value.f0, 1);
}
-
// verify the contents of that window, the contents should be:
// (key + num windows so far)
@@ -799,13 +790,15 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
// Utilities
// ------------------------------------------------------------------------
- public static class IntType {
+ private static class IntType {
public int value;
public IntType() {}
- public IntType(int value) { this.value = value; }
+ public IntType(int value) {
+ this.value = value;
+ }
}
protected int numElementsPerKey() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
index a5bf10c..f0db4d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.checkpointing;
+/**
+ * Integration tests for asynchronous file backend.
+ */
public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
index ef9ad37..70ec757 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.checkpointing;
+/**
+ * Integration tests for asynchronous memory backend.
+ */
public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 06d3ab0..d224905 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -50,12 +50,10 @@ import static org.junit.Assert.assertTrue;
* {@link org.apache.flink.test.checkpointing.StreamCheckpointingITCase} in that it contains
* a TwoInput (or co-) Task.
*
- * <p>
- * This checks whether checkpoint barriers correctly trigger TwoInputTasks and also whether
+ * <p>This checks whether checkpoint barriers correctly trigger TwoInputTasks and also whether
* this barriers are correctly forwarded.
*
- * <p>
- * The test triggers a failure after a while and verifies that, after completion, the
+ * <p>The test triggers a failure after a while and verifies that, after completion, the
* state reflects the "exactly once" semantics.
*/
@SuppressWarnings({"serial", "deprecation"})
@@ -65,8 +63,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
private static final int PARALLELISM = 4;
/**
- * Runs the following program:
- *
+ * Runs the following program.
* <pre>
* [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
@@ -136,7 +133,6 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
assertEquals(NUM_STRINGS, countSum);
}
-
// --------------------------------------------------------------------------------------------
// Custom Functions
// --------------------------------------------------------------------------------------------
@@ -145,7 +141,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
* A generating source that is slow before the first two checkpoints went through
* and will indefinitely stall at a certain point to allow the checkpoint to complete.
*
- * After the checkpoints are through, it continues with full speed.
+ * <p>After the checkpoints are through, it continues with full speed.
*/
private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
implements ListCheckpointed<Integer>, CheckpointListener {
@@ -174,7 +170,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
final int step = getRuntimeContext().getNumberOfParallelSubtasks();
if (index < 0) {
// not been restored, so initialize
- index =getRuntimeContext().getIndexOfThisSubtask();
+ index = getRuntimeContext().getIndexOfThisSubtask();
}
while (isRunning && index < numElements) {
@@ -243,11 +239,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
}
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
implements ListCheckpointed<Long> {
- static final long[] counts = new long[PARALLELISM];
-
+ static long[] counts = new long[PARALLELISM];
+
private long count;
@Override
@@ -312,7 +308,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
- static final long[] counts = new long[PARALLELISM];
+ static long[] counts = new long[PARALLELISM];
private long count = 0L;
@@ -342,8 +338,8 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements ListCheckpointed<Long> {
-
- static final long[] counts = new long[PARALLELISM];
+
+ static long[] counts = new long[PARALLELISM];
private long count;
@@ -374,7 +370,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements ListCheckpointed<Long> {
- static final long[] counts = new long[PARALLELISM];
+ static long[] counts = new long[PARALLELISM];
private long count;
@@ -409,6 +405,9 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
}
+ /**
+ * POJO storing a prefix, value, and count.
+ */
public static class PrefixCount implements Serializable {
public String prefix;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 0ad42bb..280e11a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -30,10 +31,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -56,6 +57,9 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+/**
+ * Test checkpointing while sourcing a continuous file processor.
+ */
public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase {
private static final int NO_OF_FILES = 5;
@@ -68,7 +72,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
private static String localFsURI;
private FileCreator fc;
- private static Map<Integer, Set<String>> actualCollectedContent = new HashMap<>();
+ private static Map<Integer, Set<String>> actualCollectedContent = new HashMap<>();
@BeforeClass
public static void createHDFS() {
@@ -78,10 +82,10 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
- localFsURI = "file:///" + baseDir +"/";
+ localFsURI = "file:///" + baseDir + "/";
localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
- } catch(Throwable e) {
+ } catch (Throwable e) {
e.printStackTrace();
Assert.fail("Test failed " + e.getMessage());
}
@@ -279,7 +283,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
public void run() {
try {
- for(int i = 0; i < NO_OF_FILES; i++) {
+ for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
long modTime;
do {
@@ -338,8 +342,8 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
FSDataOutputStream stream = localFs.create(tmp);
StringBuilder str = new StringBuilder();
- for(int i = 0; i < LINES_PER_FILE; i++) {
- String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+ for (int i = 0; i < LINES_PER_FILE; i++) {
+ String line = fileIdx + ": " + sampleLine + " " + i + "\n";
str.append(line);
stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}