You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:22 UTC

[22/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

[FLINK-6731] [tests] Activate strict checkstyle for flink-tests

This closes #4295


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bd491e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bd491e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bd491e0

Branch: refs/heads/master
Commit: 9bd491e05120915cbde36d4452e3982fe5d0975f
Parents: 480ccfb
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue May 30 15:40:47 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jul 12 18:37:47 2017 -0400

----------------------------------------------------------------------
 .../api/java/batch/TableEnvironmentITCase.java  |    2 +-
 .../api/java/batch/sql/GroupingSetsITCase.java  |    2 +-
 .../table/api/java/batch/sql/SqlITCase.java     |    2 +-
 flink-tests/pom.xml                             |   35 +
 .../accumulators/AccumulatorErrorITCase.java    |    8 +-
 .../test/accumulators/AccumulatorITCase.java    |   72 +-
 .../AccumulatorIterativeITCase.java             |   21 +-
 .../accumulators/AccumulatorLiveITCase.java     |   54 +-
 .../flink/test/actions/CountCollectITCase.java  |   14 +-
 .../broadcastvars/BroadcastBranchingITCase.java |   35 +-
 .../broadcastvars/BroadcastUnionITCase.java     |   14 +-
 .../BroadcastVarInitializationITCase.java       |   43 +-
 .../test/cancelling/CancelingTestBase.java      |   21 +-
 .../test/cancelling/JoinCancelingITCase.java    |   42 +-
 .../test/cancelling/MapCancelingITCase.java     |   28 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   87 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |    3 +
 ...ckendEventTimeWindowCheckpointingITCase.java |    3 +
 .../CoStreamCheckpointingITCase.java            |   31 +-
 ...ontinuousFileProcessingCheckpointITCase.java |   18 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  115 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |    3 +
 ...ckendEventTimeWindowCheckpointingITCase.java |    3 +
 .../KeyedStateCheckpointingITCase.java          |   17 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |    3 +
 .../test/checkpointing/RescalingITCase.java     |   46 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |    3 +
 .../test/checkpointing/SavepointITCase.java     |   54 +-
 .../checkpointing/StateCheckpointedITCase.java  |   71 +-
 .../StreamCheckpointNotifierITCase.java         |   98 +-
 .../StreamCheckpointingITCase.java              |   46 +-
 .../StreamFaultToleranceTestBase.java           |   11 +-
 .../TimestampedFileInputSplitTest.java          |    5 +
 .../UdfStreamOperatorCheckpointingITCase.java   |   15 +-
 .../WindowCheckpointingITCase.java              |   49 +-
 .../utils/SavepointMigrationTestBase.java       |   22 +-
 ...atefulJobSavepointFrom11MigrationITCase.java |   50 +-
 ...atefulJobSavepointFrom12MigrationITCase.java |   52 +-
 .../test/classloading/ClassLoaderITCase.java    |   17 +-
 .../jar/CheckpointedStreamingProgram.java       |   24 +-
 .../jar/CheckpointingCustomKvStateProgram.java  |    8 +-
 .../jar/CustomInputSplitProgram.java            |   21 +-
 .../classloading/jar/CustomKvStateProgram.java  |    3 +-
 .../test/classloading/jar/KMeansForTest.java    |   41 +-
 .../jar/LegacyCheckpointedStreamingProgram.java |   19 +-
 .../jar/StreamingCustomInputSplitProgram.java   |   15 +-
 .../test/classloading/jar/StreamingProgram.java |   18 +-
 .../test/classloading/jar/UserCodeType.java     |    6 +-
 .../clients/examples/JobRetrievalITCase.java    |  140 --
 .../clients/examples/LocalExecutorITCase.java   |   76 -
 .../completeness/TypeInfoTestCoverageTest.java  |    9 +-
 .../distributedCache/DistributedCacheTest.java  |  123 --
 .../distributedcache/DistributedCacheTest.java  |  125 ++
 .../test/example/client/JobRetrievalITCase.java |  143 ++
 .../example/client/LocalExecutorITCase.java     |   81 +
 .../failing/JobSubmissionFailsITCase.java       |  208 +++
 .../test/example/failing/TaskFailureITCase.java |   98 ++
 .../example/java/ConnectedComponentsITCase.java |   65 +
 .../example/java/EnumTriangleBasicITCase.java   |   52 +
 .../flink/test/example/java/PageRankITCase.java |   95 +
 .../example/java/TransitiveClosureITCase.java   |   64 +
 .../test/example/java/WebLogAnalysisITCase.java |   57 +
 .../test/example/java/WordCountITCase.java      |   51 +
 .../example/java/WordCountNestedPOJOITCase.java |  129 ++
 .../example/java/WordCountSimplePOJOITCase.java |  110 ++
 .../WordCountSubclassInterfacePOJOITCase.java   |  169 ++
 .../java/WordCountSubclassPOJOITCase.java       |  133 ++
 .../java/WordCountWithCollectionITCase.java     |   70 +
 .../scala/ConnectedComponentsITCase.java        |   65 +
 .../test/example/scala/EnumTriangleITCase.java  |   52 +
 .../test/example/scala/PageRankITCase.java      |  100 ++
 .../example/scala/TransitiveClosureITCase.java  |   64 +
 .../example/scala/WebLogAnalysisITCase.java     |   57 +
 .../test/example/scala/WordCountITCase.java     |   57 +
 .../ConnectedComponentsITCase.java              |   63 -
 .../EnumTriangleBasicITCase.java                |   48 -
 .../exampleJavaPrograms/PageRankITCase.java     |   92 -
 .../TransitiveClosureITCase.java                |   63 -
 .../WebLogAnalysisITCase.java                   |   53 -
 .../exampleJavaPrograms/WordCountITCase.java    |   47 -
 .../WordCountNestedPOJOITCase.java              |  118 --
 .../WordCountSimplePOJOITCase.java              |  102 --
 .../WordCountSubclassInterfacePOJOITCase.java   |  152 --
 .../WordCountSubclassPOJOITCase.java            |  123 --
 .../WordCountWithCollectionITCase.java          |   66 -
 .../ConnectedComponentsITCase.java              |   64 -
 .../EnumTriangleITCase.java                     |   48 -
 .../exampleScalaPrograms/PageRankITCase.java    |   95 -
 .../TransitiveClosureITCase.java                |   63 -
 .../WebLogAnalysisITCase.java                   |   53 -
 .../exampleScalaPrograms/WordCountITCase.java   |   53 -
 .../JobSubmissionFailsITCase.java               |  204 ---
 .../test/failingPrograms/TaskFailureITCase.java |   98 --
 .../hadoop/mapred/HadoopIOFormatsITCase.java    |   55 +-
 .../hadoop/mapred/WordCountMapredITCase.java    |   11 +-
 .../mapreduce/WordCountMapreduceITCase.java     |   11 +-
 .../apache/flink/test/io/CsvReaderITCase.java   |   11 +-
 .../apache/flink/test/io/InputOutputITCase.java |    2 +-
 .../flink/test/io/RichInputOutputITCase.java    |    8 +-
 .../BulkIterationWithAllReducerITCase.java      |   43 +-
 .../CoGroupConnectedComponentsITCase.java       |   25 +-
 .../CoGroupConnectedComponentsSecondITCase.java |   73 +-
 .../iterative/ConnectedComponentsITCase.java    |   19 +-
 ...ectedComponentsWithDeferredUpdateITCase.java |   34 +-
 .../ConnectedComponentsWithObjectMapITCase.java |   34 +-
 ...tedComponentsWithSolutionSetFirstITCase.java |   19 +-
 .../test/iterative/DanglingPageRankITCase.java  |   83 +-
 ...terationNotDependingOnSolutionSetITCase.java |   22 +-
 .../DependencyConnectedComponentsITCase.java    |  184 +-
 .../iterative/EmptyWorksetIterationITCase.java  |   25 +-
 .../test/iterative/IdentityIterationITCase.java |   23 +-
 ...nIncompleteDynamicPathConsumptionITCase.java |   39 +-
 ...onIncompleteStaticPathConsumptionITCase.java |   39 +-
 ...IterationTerminationWithTerminationTail.java |   10 +-
 .../IterationTerminationWithTwoTails.java       |   11 +-
 .../IterationWithAllReducerITCase.java          |    6 +-
 .../iterative/IterationWithChainingITCase.java  |   15 +-
 .../iterative/IterationWithUnionITCase.java     |   24 +-
 .../iterative/KMeansWithBroadcastSetITCase.java |    7 +-
 .../MultipleSolutionSetJoinsITCase.java         |   37 +-
 .../iterative/SolutionSetDuplicatesITCase.java  |    6 +-
 .../StaticlyNestedIterationsITCase.java         |   35 +-
 .../UnionStaticDynamicIterationITCase.java      |   27 +-
 .../AggregatorConvergenceITCase.java            |  294 ++--
 .../aggregators/AggregatorsITCase.java          |   67 +-
 .../test/javaApiOperators/AggregateITCase.java  |  180 --
 .../CoGroupGroupSortITCase.java                 |  122 --
 .../test/javaApiOperators/CoGroupITCase.java    |  992 -----------
 .../test/javaApiOperators/CrossITCase.java      |  456 -----
 .../CustomDistributionITCase.java               |  359 ----
 .../test/javaApiOperators/DataSinkITCase.java   |  355 ----
 .../test/javaApiOperators/DataSourceITCase.java |   81 -
 .../test/javaApiOperators/DistinctITCase.java   |  318 ----
 .../ExecutionEnvironmentITCase.java             |   95 -
 .../test/javaApiOperators/FilterITCase.java     |  327 ----
 .../test/javaApiOperators/FirstNITCase.java     |  151 --
 .../test/javaApiOperators/FlatMapITCase.java    |  360 ----
 .../javaApiOperators/GroupCombineITCase.java    |  482 ------
 .../javaApiOperators/GroupReduceITCase.java     | 1636 ------------------
 .../flink/test/javaApiOperators/JoinITCase.java |  938 ----------
 .../flink/test/javaApiOperators/MapITCase.java  |  514 ------
 .../javaApiOperators/MapPartitionITCase.java    |  101 --
 .../javaApiOperators/ObjectReuseITCase.java     |  216 ---
 .../test/javaApiOperators/OuterJoinITCase.java  |  680 --------
 .../test/javaApiOperators/PartitionITCase.java  |  848 ---------
 .../test/javaApiOperators/ProjectITCase.java    |   64 -
 .../test/javaApiOperators/ReduceITCase.java     |  512 ------
 .../ReduceWithCombinerITCase.java               |  313 ----
 .../RemoteEnvironmentITCase.java                |  153 --
 .../ReplicatingDataSourceITCase.java            |  121 --
 .../test/javaApiOperators/SampleITCase.java     |  167 --
 .../javaApiOperators/SortPartitionITCase.java   |  343 ----
 .../test/javaApiOperators/SumMinMaxITCase.java  |  103 --
 .../test/javaApiOperators/TypeHintITCase.java   |  326 ----
 .../test/javaApiOperators/UnionITCase.java      |  132 --
 .../util/CollectionDataSets.java                |  725 --------
 .../util/ValueCollectionDataSets.java           |  730 --------
 .../test/manual/CheckForbiddenMethodsUsage.java |    7 +-
 .../HashTableRecordWidthCombinations.java       |   19 +-
 .../flink/test/manual/MassiveStringSorting.java |  161 +-
 .../test/manual/MassiveStringValueSorting.java  |  166 +-
 .../test/manual/NotSoMiniClusterIterations.java |    8 +-
 .../flink/test/manual/OverwriteObjects.java     |    7 +-
 .../flink/test/manual/ReducePerformance.java    |    7 +-
 .../manual/StreamingScalabilityAndLatency.java  |   48 +-
 .../apache/flink/test/manual/package-info.java  |    3 +-
 .../flink/test/misc/AutoParallelismITCase.java  |    8 +-
 .../test/misc/CustomPartitioningITCase.java     |   22 +-
 .../test/misc/CustomSerializationITCase.java    |   30 +-
 .../flink/test/misc/GenericTypeInfoTest.java    |   13 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |   32 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |   26 +-
 .../flink/test/operators/AggregateITCase.java   |  183 ++
 .../test/operators/CoGroupGroupSortITCase.java  |  125 ++
 .../flink/test/operators/CoGroupITCase.java     |  989 +++++++++++
 .../flink/test/operators/CrossITCase.java       |  457 +++++
 .../operators/CustomDistributionITCase.java     |  362 ++++
 .../flink/test/operators/DataSinkITCase.java    |  356 ++++
 .../flink/test/operators/DataSourceITCase.java  |   82 +
 .../flink/test/operators/DistinctITCase.java    |  322 ++++
 .../operators/ExecutionEnvironmentITCase.java   |   95 +
 .../flink/test/operators/FilterITCase.java      |  331 ++++
 .../flink/test/operators/FirstNITCase.java      |  156 ++
 .../flink/test/operators/FlatMapITCase.java     |  364 ++++
 .../test/operators/GroupCombineITCase.java      |  484 ++++++
 .../flink/test/operators/GroupReduceITCase.java | 1633 +++++++++++++++++
 .../apache/flink/test/operators/JoinITCase.java |  945 ++++++++++
 .../apache/flink/test/operators/MapITCase.java  |  518 ++++++
 .../test/operators/MapPartitionITCase.java      |  101 ++
 .../flink/test/operators/ObjectReuseITCase.java |  215 +++
 .../flink/test/operators/OuterJoinITCase.java   |  682 ++++++++
 .../flink/test/operators/PartitionITCase.java   |  847 +++++++++
 .../flink/test/operators/ProjectITCase.java     |   67 +
 .../flink/test/operators/ReduceITCase.java      |  515 ++++++
 .../operators/ReduceWithCombinerITCase.java     |  317 ++++
 .../test/operators/RemoteEnvironmentITCase.java |  157 ++
 .../operators/ReplicatingDataSourceITCase.java  |  118 ++
 .../flink/test/operators/SampleITCase.java      |  171 ++
 .../test/operators/SortPartitionITCase.java     |  347 ++++
 .../flink/test/operators/SumMinMaxITCase.java   |  108 ++
 .../flink/test/operators/TypeHintITCase.java    |  330 ++++
 .../flink/test/operators/UnionITCase.java       |  136 ++
 .../test/operators/util/CollectionDataSets.java |  772 +++++++++
 .../operators/util/ValueCollectionDataSets.java |  775 +++++++++
 .../examples/KMeansSingleStepTest.java          |   80 +-
 .../examples/RelationalQueryCompilerTest.java   |  156 +-
 .../examples/WordCountCompilerTest.java         |   20 +-
 .../ConnectedComponentsCoGroupTest.java         |   64 +-
 ...ultipleJoinsWithSolutionSetCompilerTest.java |   82 +-
 .../iterations/PageRankCompilerTest.java        |   63 +-
 .../jsonplan/DumpCompiledPlanTest.java          |   39 +-
 .../jsonplan/JsonJobGraphGenerationTest.java    |   87 +-
 .../optimizer/jsonplan/PreviewPlanDumpTest.java |   39 +-
 .../query/AbstractQueryableStateITCase.java     |   52 +-
 .../KVStateRequestSerializerRocksDBTest.java    |   15 +-
 .../query/QueryableStateITCaseFsBackend.java    |    1 +
 .../QueryableStateITCaseRocksDBBackend.java     |    1 +
 ...ctTaskManagerProcessFailureRecoveryTest.java |   33 +-
 .../flink/test/recovery/FastFailuresITCase.java |   15 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   72 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   34 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   24 +-
 .../recovery/ProcessFailureCancelingITCase.java |   60 +-
 ...SimpleRecoveryFailureRateStrategyITBase.java |    4 +
 ...RecoveryFixedDelayRestartStrategyITBase.java |    4 +
 .../test/recovery/SimpleRecoveryITCaseBase.java |   22 +-
 .../TaskManagerFailureRecoveryITCase.java       |   38 +-
 ...anagerProcessFailureBatchRecoveryITCase.java |    7 +-
 ...erProcessFailureStreamingRecoveryITCase.java |   24 +-
 ...ConsumePipelinedAndBlockingResultITCase.java |    5 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |   36 +-
 .../flink/test/runtime/JoinDeadlockITCase.java  |    3 +-
 .../runtime/NetworkStackThroughputITCase.java   |   15 +-
 .../RegisterTypeWithKryoSerializerITCase.java   |   16 +-
 .../test/runtime/SelfJoinDeadlockITCase.java    |    3 +-
 .../ZooKeeperLeaderElectionITCase.java          |   19 +-
 .../LocalFlinkMiniClusterITCase.java            |   43 +-
 .../test/state/ManualWindowSpeedITCase.java     |    4 +-
 .../state/StateHandleSerializationTest.java     |   20 +-
 .../AbstractOperatorRestoreTestBase.java        |   21 +-
 .../state/operator/restore/ExecutionMode.java   |    3 +-
 .../restore/keyed/KeyedComplexChainTest.java    |    4 +
 .../state/operator/restore/keyed/KeyedJob.java  |    4 +-
 ...AbstractNonKeyedOperatorRestoreTestBase.java |    1 +
 .../restore/unkeyed/ChainBreakTest.java         |    1 +
 .../unkeyed/ChainLengthDecreaseTest.java        |    1 +
 .../unkeyed/ChainLengthIncreaseTest.java        |    1 +
 .../restore/unkeyed/ChainOrderTest.java         |    1 +
 .../restore/unkeyed/ChainUnionTest.java         |    1 +
 .../operator/restore/unkeyed/NonKeyedJob.java   |    4 +-
 .../streaming/api/StreamingOperatorsITCase.java |   31 +-
 .../api/outputformat/CsvOutputFormatITCase.java |    3 +
 .../outputformat/TextOutputFormatITCase.java    |    3 +
 .../runtime/ChainedRuntimeContextITCase.java    |    3 +
 .../streaming/runtime/CoGroupJoinITCase.java    |   18 +-
 .../test/streaming/runtime/CoStreamITCase.java  |    8 +-
 .../streaming/runtime/DataStreamPojoITCase.java |   65 +-
 .../streaming/runtime/DirectedOutputITCase.java |    3 +
 .../test/streaming/runtime/IterateITCase.java   |  115 +-
 .../streaming/runtime/OutputSplitterITCase.java |    3 +
 .../streaming/runtime/PartitionerITCase.java    |    2 -
 .../streaming/runtime/SelfConnectionITCase.java |    4 +-
 .../streaming/runtime/SideOutputITCase.java     |   25 +-
 .../streaming/runtime/StateBackendITCase.java   |    9 +-
 .../runtime/StreamTaskTimerITCase.java          |   22 +-
 .../test/streaming/runtime/TimestampITCase.java |  156 +-
 .../streaming/runtime/WindowFoldITCase.java     |    3 +-
 .../runtime/util/EvenOddOutputSelector.java     |    4 +
 .../test/streaming/runtime/util/NoOpIntMap.java |    4 +
 .../runtime/util/ReceiveCheckNoOpSink.java      |    6 +
 .../runtime/util/TestListResultSink.java        |    9 +-
 .../streaming/runtime/util/TestListWrapper.java |    5 +-
 .../flink/test/testfunctions/Tokenizer.java     |    3 +
 .../PojoSerializerUpgradeTest.java              |   23 +-
 .../org/apache/flink/test/util/CoordVector.java |   24 +-
 .../flink/test/util/DataSetUtilsITCase.java     |   36 +-
 .../test/util/InfiniteIntegerInputFormat.java   |    7 +-
 .../util/InfiniteIntegerTupleInputFormat.java   |    7 +-
 .../apache/flink/test/util/PointFormatter.java  |    6 +-
 .../apache/flink/test/util/PointInFormat.java   |    8 +-
 .../org/apache/flink/test/util/TestUtils.java   |    7 +-
 .../UniformIntTupleGeneratorInputFormat.java    |    8 +-
 .../sessionwindows/EventGenerator.java          |    2 +-
 .../sessionwindows/EventGeneratorFactory.java   |    3 +-
 .../sessionwindows/GeneratorConfiguration.java  |   22 +-
 .../sessionwindows/GeneratorEventFactory.java   |   15 +-
 .../sessionwindows/LongRandomGenerator.java     |    7 +-
 .../ParallelSessionsEventGenerator.java         |   14 +-
 .../sessionwindows/SessionConfiguration.java    |    4 +-
 .../windowing/sessionwindows/SessionEvent.java  |    2 +-
 .../SessionEventGeneratorImpl.java              |   12 +-
 .../SessionGeneratorConfiguration.java          |    2 +-
 .../sessionwindows/SessionWindowITCase.java     |   12 +-
 .../sessionwindows/TestEventPayload.java        |    4 +-
 .../scala/operators/GroupCombineITCase.scala    |    2 +-
 295 files changed, 17887 insertions(+), 17270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index aac7e11..0def39a 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestB
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.table.calcite.CalciteConfig;
 import org.apache.flink.table.calcite.CalciteConfigBuilder;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
 import org.apache.flink.types.Row;
 
 import org.apache.calcite.tools.RuleSets;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
index 6c1a753..29fbdf5 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index f4e5daf..6a72b3e 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index beac803..3f67a88 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -338,6 +338,41 @@ under the License.
 				</executions>
 			</plugin>
 
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+						Execute checkstyle after compilation but before tests.
+
+						This ensures that any parsing or type checking errors are from
+						javac, so they look as expected. Beyond that, we want to
+						fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
 			<!-- Scala Code Style, most of the configuration done via plugin management -->
 			<plugin>
 				<groupId>org.scalastyle</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 25d9228..4de1602 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -32,18 +31,18 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-
 import static org.junit.Assert.fail;
 
 /**
- * Tests cases where Accumulator are
+ * Tests cases where accumulators:
  *  a) throw errors during runtime
- *  b) is not compatible with existing accumulator
+ *  b) are not compatible with existing accumulator.
  */
 public class AccumulatorErrorITCase extends TestLogger {
 
@@ -91,7 +90,6 @@ public class AccumulatorErrorITCase extends TestLogger {
 		}
 	}
 
-
 	@Test
 	public void testInvalidTypeAccumulator() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index 5f2b0a9..3e35bd0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.test.accumulators;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -38,16 +34,20 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
-import org.junit.Assert;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.junit.Assert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Test for the basic functionality of accumulators. We cannot test all different
  * kinds of plans here (iterative, etc.).
- * 
- * TODO Test conflict when different UDFs write to accumulator with same name
+ *
+ * <p>TODO Test conflict when different UDFs write to accumulator with same name
  * but with different type. The conflict will occur in JobManager while merging.
  */
 @SuppressWarnings("serial")
@@ -60,31 +60,31 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 	private String resultPath;
 
 	private JobExecutionResult result;
-	
+
 	@Override
 	protected void preSubmit() throws Exception {
 		dataPath = createTempFile("datapoints.txt", INPUT);
 		resultPath = getTempFilePath("result");
 	}
-	
+
 	@Override
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(EXPECTED, resultPath);
-		
+
 		// Test accumulator results
 		System.out.println("Accumulator results:");
 		JobExecutionResult res = this.result;
 		System.out.println(AccumulatorHelper.getResultsFormatted(res.getAllAccumulatorResults()));
 
-		Assert.assertEquals(Integer.valueOf(3), (Integer) res.getAccumulatorResult("num-lines"));
+		Assert.assertEquals(Integer.valueOf(3), res.getAccumulatorResult("num-lines"));
+
+		Assert.assertEquals(Double.valueOf(getParallelism()), res.getAccumulatorResult("open-close-counter"));
 
-		Assert.assertEquals(Double.valueOf(getParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
-		
 		// Test histogram (words per line distribution)
 		Map<Integer, Integer> dist = Maps.newHashMap();
 		dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
 		Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
-		
+
 		// Test distinct words (custom accumulator)
 		Set<StringValue> distinctWords = Sets.newHashSet();
 		distinctWords.add(new StringValue("one"));
@@ -96,18 +96,18 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<String> input = env.readTextFile(dataPath); 
-		
+
+		DataSet<String> input = env.readTextFile(dataPath);
+
 		input.flatMap(new TokenizeLine())
 			.groupBy(0)
 			.reduceGroup(new CountWords())
 			.writeAsCsv(resultPath, "\n", " ");
-		
+
 		this.result = env.execute();
 	}
-	
-	public static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
+
+	private static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
 
 		// Needs to be instantiated later since the runtime context is not yet
 		// initialized at this place
@@ -120,7 +120,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 
 		@Override
 		public void open(Configuration parameters) {
-		  
+
 			// Add counters using convenience functions
 			this.cntNumLines = getRuntimeContext().getIntCounter("num-lines");
 			this.wordsPerLineDistribution = getRuntimeContext().getHistogram("words-per-line");
@@ -157,20 +157,20 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 			// Test counter used in open() and closed()
 			this.openCloseCounter.add(0.5);
 		}
-		
+
 		@Override
 		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
 			this.cntNumLines.add(1);
 			int wordsPerLine = 0;
-			
+
 			for (String token : value.toLowerCase().split("\\W+")) {
 				distinctWords.add(new StringValue(token));
 				out.collect(new Tuple2<>(token, 1));
-				++ wordsPerLine;
+				++wordsPerLine;
 			}
 			wordsPerLineDistribution.add(wordsPerLine);
 		}
-		
+
 		@Override
 		public void close() throws Exception {
 			// Test counter used in open and close only
@@ -179,37 +179,35 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 		}
 	}
 
-	
-	public static class CountWords
+	private static class CountWords
 		extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-		implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-	{
-		
+		implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
 		private IntCounter reduceCalls;
 		private IntCounter combineCalls;
-		
+
 		@Override
 		public void open(Configuration parameters) {
 			this.reduceCalls = getRuntimeContext().getIntCounter("reduce-calls");
 			this.combineCalls = getRuntimeContext().getIntCounter("combine-calls");
 		}
-		
+
 		@Override
 		public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
 			reduceCalls.add(1);
 			reduceInternal(values, out);
 		}
-		
+
 		@Override
 		public void combine(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
 			combineCalls.add(1);
 			reduceInternal(values, out);
 		}
-		
+
 		private void reduceInternal(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
 			int sum = 0;
 			String key = null;
-			
+
 			for (Tuple2<String, Integer> e : values) {
 				key = e.f0;
 				sum += e.f1;
@@ -217,9 +215,9 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 			out.collect(new Tuple2<>(key, sum));
 		}
 	}
-	
+
 	/**
-	 * Custom accumulator
+	 * Custom accumulator.
 	 */
 	public static class SetAccumulator<T> implements Accumulator<T, HashSet<T>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index d86d517..3e19ce8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.accumulators;
 
 import org.apache.flink.api.common.accumulators.IntCounter;
@@ -27,13 +26,17 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 
-public class AccumulatorIterativeITCase extends JavaProgramTestBase {	
+/**
+ * Test accumulator within iteration.
+ */
+public class AccumulatorIterativeITCase extends JavaProgramTestBase {
 	private static final int NUM_ITERATIONS = 3;
 	private static final int NUM_SUBTASKS = 1;
 	private static final String ACC_NAME = "test";
-	
+
 	@Override
 	protected boolean skipCollectionExecution() {
 		return true;
@@ -43,20 +46,20 @@ public class AccumulatorIterativeITCase extends JavaProgramTestBase {
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(NUM_SUBTASKS);
-		
+
 		IterativeDataSet<Integer> iteration = env.fromElements(1, 2, 3).iterate(NUM_ITERATIONS);
-		
+
 		iteration.closeWith(iteration.reduceGroup(new SumReducer())).output(new DiscardingOutputFormat<Integer>());
 
 		Assert.assertEquals(NUM_ITERATIONS * 6, (int) env.execute().getAccumulatorResult(ACC_NAME));
 	}
-	
+
 	static final class SumReducer extends RichGroupReduceFunction<Integer, Integer> {
-		
+
 		private static final long serialVersionUID = 1L;
-		
+
 		private IntCounter testCounter = new IntCounter();
-		
+
 		@Override
 		public void open(Configuration config) throws Exception {
 			getRuntimeContext().addAccumulator(ACC_NAME, this.testCounter);

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 92e5768..756b81e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.test.accumulators;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -55,39 +49,42 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
-
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.fail;
 
 /**
  * Tests the availability of accumulator results during runtime. The test case tests a user-defined
  * accumulator and Flink's internal accumulators for two consecutive tasks.
  *
- * CHAINED[Source -> Map] -> Sink
+ * <p>CHAINED[Source -> Map] -> Sink
  *
- * Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
+ * <p>Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
  * the task to the task manager which notifies the job manager and sends the current accumulators.
  * The task blocks until the test has been notified about the current accumulator values.
  *
- * A barrier between the operators ensures that that pipelining is disabled for the streaming test.
+ * <p>A barrier between the operators ensures that that pipelining is disabled for the streaming test.
  * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
  * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
  * check for an upper bound of the elements read.
@@ -104,7 +101,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 	private static JobGraph jobGraph;
 
 	// name of user accumulator
-	private static String ACCUMULATOR_NAME = "test";
+	private static final String ACCUMULATOR_NAME = "test";
 
 	// number of heartbeat intervals to check
 	private static final int NUM_ITERATIONS = 5;
@@ -113,7 +110,6 @@ public class AccumulatorLiveITCase extends TestLogger {
 
 	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
 
-
 	@Before
 	public void before() throws Exception {
 		system = AkkaUtils.createLocalActorSystem(new Configuration());
@@ -129,8 +125,8 @@ public class AccumulatorLiveITCase extends TestLogger {
 		taskManager = testingCluster.getTaskManagersAsJava().get(0);
 
 		// generate test data
-		for (int i=0; i < NUM_ITERATIONS; i++) {
-			inputData.add(i, String.valueOf(i+1));
+		for (int i = 0; i < NUM_ITERATIONS; i++) {
+			inputData.add(i, String.valueOf(i + 1));
 		}
 
 		NotifyingMapper.finished = false;
@@ -163,7 +159,6 @@ public class AccumulatorLiveITCase extends TestLogger {
 		verifyResults();
 	}
 
-
 	@Test
 	public void testStreaming() throws Exception {
 
@@ -175,14 +170,12 @@ public class AccumulatorLiveITCase extends TestLogger {
 				.flatMap(new NotifyingMapper())
 				.writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
 
-
 		jobGraph = env.getStreamGraph().getJobGraph();
 		jobID = jobGraph.getJobID();
 
 		verifyResults();
 	}
 
-
 	private static void verifyResults() {
 		new JavaTestKit(system) {{
 
@@ -201,7 +194,6 @@ public class AccumulatorLiveITCase extends TestLogger {
 					selfGateway);
 			expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
 
-
 			TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
 			Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
 
@@ -210,7 +202,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 			ExecutionAttemptID sinkTaskID = null;
 
 			/* Check for accumulator values */
-			if(checkUserAccumulators(0, userAccumulators)) {
+			if (checkUserAccumulators(0, userAccumulators)) {
 				LOG.info("Passed initial check for map task.");
 			} else {
 				fail("Wrong accumulator results when map task begins execution.");
@@ -244,7 +236,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 			msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
 			userAccumulators = msg.userAccumulators();
 
-			if(checkUserAccumulators(expectedAccVal, userAccumulators)) {
+			if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
 				LOG.info("Passed initial check for sink task.");
 			} else {
 				fail("Wrong accumulator results when sink task begins execution.");
@@ -272,14 +264,13 @@ public class AccumulatorLiveITCase extends TestLogger {
 		}};
 	}
 
-
-	private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) {
+	private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?, ?>> accumulatorMap) {
 		LOG.info("checking user accumulators");
-		return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
+		return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter) accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
 	}
 
 	/**
-	 * UDF that notifies when it changes the accumulator values
+	 * UDF that notifies when it changes the accumulator values.
 	 */
 	private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
 		private static final long serialVersionUID = 1L;
@@ -356,7 +347,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 	}
 
 	/**
-	 * Helpers to generate the JobGraph
+	 * Helpers to generate the JobGraph.
 	 */
 	private static JobGraph getOptimizedPlan(Plan plan) {
 		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
@@ -376,7 +367,6 @@ public class AccumulatorLiveITCase extends TestLogger {
 		}
 	}
 
-
 	/**
 	 * This is used to for creating the example topology. {@link #execute} is never called, we
 	 * only use this to call {@link #getStreamGraph()}.

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
index e742c27..737bdfb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
@@ -21,17 +21,18 @@ package org.apache.flink.test.actions;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import static org.junit.Assert.*;
-
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests the methods that bring elements back to the client driver program.
  */
@@ -64,7 +65,6 @@ public class CountCollectITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableObjectReuse();
 
-
 		DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 		DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
index 5f2950c..720784e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.test.broadcastvars;
 
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -35,6 +32,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test broadcast input after branching.
+ */
 public class BroadcastBranchingITCase extends JavaProgramTestBase {
 	private static final String RESULT = "(2,112)\n";
 
@@ -63,16 +66,16 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
 				.fromElements(new Tuple2<>("1", 2), new Tuple2<>("2", 3), new Tuple2<>("3", 7));
 
 		// Jn1 matches x and y values on id and emits (id, x, y) triples
-		JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1
-				= sc2.join(sc3).where(0).equalTo(0).with(new Jn1());
+		JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1 =
+				sc2.join(sc3).where(0).equalTo(0).with(new Jn1());
 
 		// Jn2 matches polynomial and arguments by id, computes p = min(P(x),P(y)) and emits (id, p) tuples
-		JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2
-				= jn1.join(sc1).where(0).equalTo(0).with(new Jn2());
+		JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2 =
+				jn1.join(sc1).where(0).equalTo(0).with(new Jn2());
 
 		// Mp1 selects (id, x, y) triples where x = y and broadcasts z (=x=y) to Mp2
-		FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1
-				= jn1.flatMap(new Mp1());
+		FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1 =
+				jn1.flatMap(new Mp1());
 
 		// Mp2 filters out all p values which can be divided by z
 		List<Tuple2<String, Integer>> result = jn2.flatMap(new Mp2()).withBroadcastSet(mp1, "z").collect();
@@ -80,7 +83,7 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
 		JavaProgramTestBase.compareResultAsText(result, RESULT);
 	}
 
-	public static class Jn1 implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
+	private static class Jn1 implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -89,7 +92,7 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
 		}
 	}
 
-	public static class Jn2 implements JoinFunction<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> {
+	private static class Jn2 implements JoinFunction<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		private static int p(int x, int a, int b, int c) {
@@ -104,14 +107,14 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
 			int b = second.f2;
 			int c = second.f3;
 
-			int p_x = p(x, a, b, c);
-			int p_y = p(y, a, b, c);
-			int min = Math.min(p_x, p_y);
+			int pX = p(x, a, b, c);
+			int pY = p(y, a, b, c);
+			int min = Math.min(pX, pY);
 			return new Tuple2<>(first.f0, min);
 		}
 	}
 
-	public static class Mp1 implements FlatMapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
+	private static class Mp1 implements FlatMapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -122,7 +125,7 @@ public class BroadcastBranchingITCase extends JavaProgramTestBase {
 		}
 	}
 
-	public static class Mp2 extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+	private static class Mp2 extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		private Collection<Tuple2<String, Integer>> zs;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
index 113a330..79b0033 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java
@@ -15,17 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.broadcastvars;
 
-import java.util.List;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
+
 import org.junit.Assert;
 
+import java.util.List;
+
+/**
+ * Test broadcast input after union.
+ */
 public class BroadcastUnionITCase extends JavaProgramTestBase {
 	private static final String BC_NAME = "bc";
 
@@ -43,11 +49,11 @@ public class BroadcastUnionITCase extends JavaProgramTestBase {
 				.withBroadcastSet(bc1.union(bc2), BC_NAME)
 				.reduce(new Reducer())
 				.collect();
-		
+
 		Assert.assertEquals(Long.valueOf(3025), result.get(0));
 	}
 
-	public static class Mapper extends RichMapFunction<Long, Long> {
+	private static class Mapper extends RichMapFunction<Long, Long> {
 		private List<Long> values;
 
 		@Override
@@ -65,7 +71,7 @@ public class BroadcastUnionITCase extends JavaProgramTestBase {
 		}
 	}
 
-	public static class Reducer implements ReduceFunction<Long> {
+	private static class Reducer implements ReduceFunction<Long> {
 		@Override
 		public Long reduce(Long value1, Long value2) throws Exception {
 			return value1 + value2;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
index 41d24b8..26f5763 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.test.broadcastvars;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.DataSet;
@@ -31,36 +27,43 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Test {@link BroadcastVariableInitializer}.
+ */
 @SuppressWarnings("serial")
 public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
-	
+
 	@Override
 	protected void testProgram() throws Exception {
-		
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(4);
-		
+
 		DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
-		
+
 		IterativeDataSet<Integer> iteration = data.iterate(10);
-		
+
 		DataSet<Integer> result = data.reduceGroup(new PickOneAllReduce()).withBroadcastSet(iteration, "bc");
-		
+
 		final List<Integer> resultList = new ArrayList<Integer>();
 		iteration.closeWith(result).output(new LocalCollectionOutputFormat<Integer>(resultList));
-		
+
 		env.execute();
-		
+
 		Assert.assertEquals(8, resultList.get(0).intValue());
 	}
 
-	
-	public static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
-		
+	private static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
+
 		private Integer bcValue;
-		
+
 		@Override
 		public void open(Configuration parameters) {
 			this.bcValue = getRuntimeContext().getBroadcastVariableWithInitializer("bc", new PickFirstInitializer());
@@ -72,8 +75,8 @@ public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
 				return;
 			}
 			final int x = bcValue;
-			
-			for (Integer y : records) { 
+
+			for (Integer y : records) {
 				if (y > x) {
 					out.collect(y);
 					return;
@@ -83,8 +86,8 @@ public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
 			out.collect(bcValue);
 		}
 	}
-	
-	public static class PickFirstInitializer implements BroadcastVariableInitializer<Integer, Integer> {
+
+	private static class PickFirstInitializer implements BroadcastVariableInitializer<Integer, Integer> {
 
 		@Override
 		public Integer initializeBroadcastVariable(Iterable<Integer> data) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 2767312..88c921d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.cancelling;
 
 import org.apache.flink.api.common.Plan;
@@ -35,31 +34,33 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 
 /**
- * 
+ * Base class for testing job cancellation.
  */
 public abstract class CancelingTestBase extends TestLogger {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
 
 	private static final int MINIMUM_HEAP_SIZE_MB = 192;
-	
+
 	/**
 	 * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
 	 * is canceled), starting from the point in time when the cancel request is issued.
@@ -69,13 +70,13 @@ public abstract class CancelingTestBase extends TestLogger {
 	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected LocalFlinkMiniCluster executor;
 
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private void verifyJvmOptions() {
 		final long heap = Runtime.getRuntime().maxMemory() >> 20;
 		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
@@ -112,7 +113,7 @@ public abstract class CancelingTestBase extends TestLogger {
 	public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
 		runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
 	}
-		
+
 	public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
 		try {
 			// submit job

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index d797e47..5e21129 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.cancelling;
 
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -31,13 +30,16 @@ import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
 import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
 
+/**
+ * Test job cancellation from within a JoinFunction.
+ */
 public class JoinCancelingITCase extends CancelingTestBase {
 	private static final int parallelism = 4;
 
 	public JoinCancelingITCase() {
 		setTaskManagerNumSlots(parallelism);
 	}
-	
+
 	// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
 	private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
 		executeTask(joiner, slow, parallelism);
@@ -68,7 +70,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
 	public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
 		executeTask(new SimpleMatcher<Integer>(), false);
 	}
-	
+
 //	@Test
 	public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
 		executeTask(new StuckInOpenMatcher<Integer>(), false);
@@ -92,45 +94,45 @@ public class JoinCancelingITCase extends CancelingTestBase {
 
 		runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
 	}
-	
+
 //	@Test
 	public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
 		executeTaskWithGenerator(new SimpleMatcher<Integer>(), 50000, 100, 30 * 1000, 30 * 1000);
 	}
 
 	// --------------- Test Sort Matches that are canceled while in the Matching Phase -----------------
-	
+
 //	@Test
 	public void testCancelSortMatchWhileJoining() throws Exception {
 		executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10 * 1000, 20 * 1000);
 	}
-	
+
 //	@Test
 	public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
 		executeTaskWithGenerator(new LongCancelTimeMatcher<Integer>(), 500, 3, 10 * 1000, 10 * 1000);
 	}
 
 	// -------------------------------------- Test System corner cases ---------------------------------
-	
+
 //	@Test
 	public void testCancelSortMatchWithHighparallelism() throws Exception {
 		executeTask(new SimpleMatcher<Integer>(), false, 64);
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class SimpleMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+	private static final class SimpleMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
 			return new Tuple2<>(first.f0, second.f0);
 		}
 	}
-	
-	public static final class DelayingMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+	private static final class DelayingMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private static final int WAIT_TIME_PER_RECORD = 10 * 1000; // 10 sec.
 
 		@Override
@@ -139,12 +141,12 @@ public class JoinCancelingITCase extends CancelingTestBase {
 			return new Tuple2<>(first.f0, second.f0);
 		}
 	}
-	
-	public static final class LongCancelTimeMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+	private static final class LongCancelTimeMatcher<IN> implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private static final int WAIT_TIME_PER_RECORD = 5 * 1000; // 5 sec.
-		
+
 		@Override
 		public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
 			final long start = System.currentTimeMillis();
@@ -157,10 +159,10 @@ public class JoinCancelingITCase extends CancelingTestBase {
 			return new Tuple2<>(first.f0, second.f0);
 		}
 	}
-	
-	public static final class StuckInOpenMatcher<IN> extends RichJoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
+
+	private static final class StuckInOpenMatcher<IN> extends RichJoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			synchronized (this) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 03b1a24..3a7039f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -24,30 +24,34 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.InfiniteIntegerInputFormat;
+
 import org.junit.Test;
 
+/**
+ * Test job cancellation from within a MapFunction.
+ */
 public class MapCancelingITCase extends CancelingTestBase {
 	private static final int parallelism = 4;
 
 	public MapCancelingITCase() {
 		setTaskManagerNumSlots(parallelism);
 	}
-	
+
 	@Test
 	public void testMapCancelling() throws Exception {
 		executeTask(new IdentityMapper<Integer>());
 	}
-	
+
 	@Test
 	public void testSlowMapCancelling() throws Exception {
 		executeTask(new DelayingIdentityMapper<Integer>());
 	}
-	
+
 	@Test
 	public void testMapWithLongCancellingResponse() throws Exception {
 		executeTask(new LongCancelTimeIdentityMapper<Integer>());
 	}
-	
+
 	@Test
 	public void testMapPriorToFirstRecordReading() throws Exception {
 		executeTask(new StuckInOpenIdentityMapper<Integer>());
@@ -67,8 +71,8 @@ public class MapCancelingITCase extends CancelingTestBase {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class IdentityMapper<IN> implements MapFunction<IN, IN> {
+
+	private static final class IdentityMapper<IN> implements MapFunction<IN, IN> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -76,8 +80,8 @@ public class MapCancelingITCase extends CancelingTestBase {
 			return value;
 		}
 	}
-	
-	public static final class DelayingIdentityMapper<IN> implements MapFunction<IN, IN> {
+
+	private static final class DelayingIdentityMapper<IN> implements MapFunction<IN, IN> {
 		private static final long serialVersionUID = 1L;
 
 		private static final int WAIT_TIME_PER_VALUE = 10 * 1000; // 10 sec.
@@ -88,8 +92,8 @@ public class MapCancelingITCase extends CancelingTestBase {
 			return value;
 		}
 	}
-	
-	public static final class LongCancelTimeIdentityMapper<IN> implements MapFunction<IN, IN> {
+
+	private static final class LongCancelTimeIdentityMapper<IN> implements MapFunction<IN, IN> {
 		private static final long serialVersionUID = 1L;
 
 		private static final int WAIT_TIME_PER_VALUE = 5 * 1000; // 5 sec.
@@ -108,8 +112,8 @@ public class MapCancelingITCase extends CancelingTestBase {
 			return value;
 		}
 	}
-	
-	public static final class StuckInOpenIdentityMapper<IN> extends RichMapFunction<IN, IN> {
+
+	private static final class StuckInOpenIdentityMapper<IN> extends RichMapFunction<IN, IN> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 0021b81..269b126 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -169,11 +170,11 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testTumblingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
-		final int WINDOW_SIZE = windowSize();
-		final int NUM_KEYS = numKeys();
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int numKeys = numKeys();
 		FailingSource.reset();
-		
+
 		try {
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -183,10 +184,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			env.setStateBackend(this.stateBackend);
 
 			env
-					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
 					.rebalance()
 					.keyBy(0)
-					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.timeWindow(Time.of(windowSize, MILLISECONDS))
 					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
 
 						private boolean open = false;
@@ -217,8 +218,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -239,9 +239,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	}
 
 	public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
-		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
-		final int WINDOW_SIZE = windowSize();
-		final int NUM_KEYS = numKeys();
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int numKeys = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -254,10 +254,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			env.setStateBackend(this.stateBackend);
 
 			env
-					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
 					.rebalance()
 					.keyBy(0)
-					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.timeWindow(Time.of(windowSize, MILLISECONDS))
 					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
 
 						private boolean open = false;
@@ -292,8 +292,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 							out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
 						}
 					})
-					.addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+					.addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -305,10 +304,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testSlidingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
-		final int WINDOW_SIZE = windowSize();
-		final int WINDOW_SLIDE = windowSlide();
-		final int NUM_KEYS = numKeys();
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int windowSlide = windowSlide();
+		final int numKeys = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -322,10 +321,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			env.getConfig().setUseSnapshotCompression(true);
 
 			env
-					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
 					.rebalance()
 					.keyBy(0)
-					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
 					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
 
 						private boolean open = false;
@@ -356,8 +355,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -369,9 +367,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testPreAggregatedTumblingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
-		final int WINDOW_SIZE = windowSize();
-		final int NUM_KEYS = numKeys();
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int numKeys = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -383,10 +381,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			env.setStateBackend(this.stateBackend);
 
 			env
-					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
 					.rebalance()
 					.keyBy(0)
-					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.timeWindow(Time.of(windowSize, MILLISECONDS))
 					.reduce(
 							new ReduceFunction<Tuple2<Long, IntType>>() {
 
@@ -425,8 +423,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 							}
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -438,10 +435,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testPreAggregatedSlidingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
-		final int WINDOW_SIZE = windowSize();
-		final int WINDOW_SLIDE = windowSlide();
-		final int NUM_KEYS = numKeys();
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int windowSlide = windowSlide();
+		final int numKeys = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -453,10 +450,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			env.setStateBackend(this.stateBackend);
 
 			env
-					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
 					.rebalance()
 					.keyBy(0)
-					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
 					.reduce(
 							new ReduceFunction<Tuple2<Long, IntType>>() {
 
@@ -497,8 +494,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 							}
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -508,14 +504,12 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		}
 	}
 
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
 	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<Integer>, CheckpointListener
-	{
+			implements ListCheckpointed<Integer>, CheckpointListener {
 		private static volatile boolean failedBefore = false;
 
 		private final int numKeys;
@@ -556,8 +550,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 				}
 
 				if (numElementsEmitted < numElementsToEmit &&
-						(failedBefore || numElementsEmitted <= failureAfterNumElements))
-				{
+						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
 					// the function failed before, or we are in the elements before the failure
 					synchronized (ctx.getCheckpointLock()) {
 						int next = numElementsEmitted++;
@@ -669,7 +662,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
 
-
 			Integer curr = windowCounts.get(value.f0);
 			if (curr != null) {
 				windowCounts.put(value.f0, curr + 1);
@@ -756,7 +748,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 				windowCounts.put(value.f0, 1);
 			}
 
-
 			// verify the contents of that window, the contents should be:
 			// (key + num windows so far)
 
@@ -799,13 +790,15 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	public static class IntType {
+	private static class IntType {
 
 		public int value;
 
 		public IntType() {}
 
-		public IntType(int value) { this.value = value; }
+		public IntType(int value) {
+			this.value = value;
+		}
 	}
 
 	protected int numElementsPerKey() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
index a5bf10c..f0db4d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.checkpointing;
 
+/**
+ * Integration tests for asynchronous file backend.
+ */
 public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
 	public AsyncFileBackendEventTimeWindowCheckpointingITCase() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
index ef9ad37..70ec757 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.checkpointing;
 
+/**
+ * Integration tests for asynchronous memory backend.
+ */
 public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
 	public AsyncMemBackendEventTimeWindowCheckpointingITCase() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 06d3ab0..d224905 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -50,12 +50,10 @@ import static org.junit.Assert.assertTrue;
  * {@link org.apache.flink.test.checkpointing.StreamCheckpointingITCase} in that it contains
  * a TwoInput (or co-) Task.
  *
- * <p>
- * This checks whether checkpoint barriers correctly trigger TwoInputTasks and also whether
+ * <p>This checks whether checkpoint barriers correctly trigger TwoInputTasks and also whether
  * this barriers are correctly forwarded.
  *
- * <p>
- * The test triggers a failure after a while and verifies that, after completion, the
+ * <p>The test triggers a failure after a while and verifies that, after completion, the
  * state reflects the "exactly once" semantics.
  */
 @SuppressWarnings({"serial", "deprecation"})
@@ -65,8 +63,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 	private static final int PARALLELISM = 4;
 
 	/**
-	 * Runs the following program:
-	 *
+	 * Runs the following program.
 	 * <pre>
 	 *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
@@ -136,7 +133,6 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		assertEquals(NUM_STRINGS, countSum);
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
@@ -145,7 +141,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 	 * A generating source that is slow before the first two checkpoints went through
 	 * and will indefinitely stall at a certain point to allow the checkpoint to complete.
 	 *
-	 * After the checkpoints are through, it continues with full speed.
+	 * <p>After the checkpoints are through, it continues with full speed.
 	 */
 	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
 			implements ListCheckpointed<Integer>, CheckpointListener {
@@ -174,7 +170,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
 			if (index < 0) {
 				// not been restored, so initialize
-				index =getRuntimeContext().getIndexOfThisSubtask();
+				index = getRuntimeContext().getIndexOfThisSubtask();
 			}
 
 			while (isRunning && index < numElements) {
@@ -243,11 +239,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 	}
 
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
 			implements ListCheckpointed<Long> {
 
-		static final long[] counts = new long[PARALLELISM];
-		
+		static long[] counts = new long[PARALLELISM];
+
 		private long count;
 
 		@Override
@@ -312,7 +308,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 
 	private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
 
-		static final long[] counts = new long[PARALLELISM];
+		static long[] counts = new long[PARALLELISM];
 
 		private long count = 0L;
 
@@ -342,8 +338,8 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 	}
 
 	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements ListCheckpointed<Long> {
-		
-		static final long[] counts = new long[PARALLELISM];
+
+		static long[] counts = new long[PARALLELISM];
 
 		private long count;
 
@@ -374,7 +370,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 
 	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements ListCheckpointed<Long> {
 
-		static final long[] counts = new long[PARALLELISM];
+		static long[] counts = new long[PARALLELISM];
 
 		private long count;
 
@@ -409,6 +405,9 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 	}
 
+	/**
+	 * POJO storing a prefix, value, and count.
+	 */
 	public static class PrefixCount implements Serializable {
 
 		public String prefix;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 0ad42bb..280e11a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -30,10 +31,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -56,6 +57,9 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/**
+ * Test checkpointing while sourcing a continuous file processor.
+ */
 public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase {
 
 	private static final int NO_OF_FILES = 5;
@@ -68,7 +72,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 	private static String localFsURI;
 	private FileCreator fc;
 
-	private static  Map<Integer, Set<String>> actualCollectedContent = new HashMap<>();
+	private static Map<Integer, Set<String>> actualCollectedContent = new HashMap<>();
 
 	@BeforeClass
 	public static void createHDFS() {
@@ -78,10 +82,10 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 
 			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
 
-			localFsURI = "file:///" + baseDir +"/";
+			localFsURI = "file:///" + baseDir + "/";
 			localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
 
-		} catch(Throwable e) {
+		} catch (Throwable e) {
 			e.printStackTrace();
 			Assert.fail("Test failed " + e.getMessage());
 		}
@@ -279,7 +283,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 
 		public void run() {
 			try {
-				for(int i = 0; i < NO_OF_FILES; i++) {
+				for (int i = 0; i < NO_OF_FILES; i++) {
 					Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
 					long modTime;
 					do {
@@ -338,8 +342,8 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 
 		FSDataOutputStream stream = localFs.create(tmp);
 		StringBuilder str = new StringBuilder();
-		for(int i = 0; i < LINES_PER_FILE; i++) {
-			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx + ": " + sampleLine + " " + i + "\n";
 			str.append(line);
 			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}