You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:21 UTC

[21/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index bda1679..fd4ecd4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -51,13 +52,14 @@ import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * This verifies that checkpointing works correctly with event time windows.
  *
- * <p>
- * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
+ * <p>This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
  */
 @SuppressWarnings("serial")
 public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@@ -68,7 +70,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	private static TestStreamEnvironment env;
 
-
 	@BeforeClass
 	public static void startTestCluster() {
 		Configuration config = new Configuration();
@@ -94,11 +95,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testTumblingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 100;
-		final int NUM_KEYS = 1;
+		final int numElementsPerKey = 3000;
+		final int windowSize = 100;
+		final int numKeys = 1;
 		FailingSource.reset();
-		
+
 		try {
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -107,11 +108,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_KEYS,
-							NUM_ELEMENTS_PER_KEY,
-							NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys,
+							numElementsPerKey,
+							numElementsPerKey / 3))
 					.rebalance()
-					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.timeWindowAll(Time.of(windowSize, MILLISECONDS))
 					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
 
 						private boolean open = false;
@@ -141,8 +142,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -154,10 +154,10 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testSlidingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 1000;
-		final int WINDOW_SLIDE = 100;
-		final int NUM_KEYS = 1;
+		final int numElementsPerKey = 3000;
+		final int windowSize = 1000;
+		final int windowSlide = 100;
+		final int numKeys = 1;
 		FailingSource.reset();
 
 		try {
@@ -168,9 +168,9 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
 					.rebalance()
-					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.timeWindowAll(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
 					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
 
 						private boolean open = false;
@@ -200,8 +200,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
 
 			tryExecute(env, "Sliding Window Test");
 		}
@@ -213,9 +212,9 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testPreAggregatedTumblingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 100;
-		final int NUM_KEYS = 1;
+		final int numElementsPerKey = 3000;
+		final int windowSize = 100;
+		final int numKeys = 1;
 		FailingSource.reset();
 
 		try {
@@ -226,11 +225,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_KEYS,
-							NUM_ELEMENTS_PER_KEY,
-							NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys,
+							numElementsPerKey,
+							numElementsPerKey / 3))
 					.rebalance()
-					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.timeWindowAll(Time.of(windowSize, MILLISECONDS))
 					.reduce(
 							new ReduceFunction<Tuple2<Long, IntType>>() {
 
@@ -269,8 +268,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							}
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -282,9 +280,9 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testPreAggregatedFoldingTumblingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 100;
-		final int NUM_KEYS = 1;
+		final int numElementsPerKey = 3000;
+		final int windowSize = 100;
+		final int numKeys = 1;
 		FailingSource.reset();
 
 		try {
@@ -295,11 +293,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_KEYS,
-							NUM_ELEMENTS_PER_KEY,
-							NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys,
+							numElementsPerKey,
+							numElementsPerKey / 3))
 					.rebalance()
-					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.timeWindowAll(Time.of(windowSize, MILLISECONDS))
 					.fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)),
 							new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() {
 								@Override
@@ -337,8 +335,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 									}
 								}
 							})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -350,10 +347,10 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testPreAggregatedSlidingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 1000;
-		final int WINDOW_SLIDE = 100;
-		final int NUM_KEYS = 1;
+		final int numElementsPerKey = 3000;
+		final int windowSize = 1000;
+		final int windowSlide = 100;
+		final int numKeys = 1;
 		FailingSource.reset();
 
 		try {
@@ -364,12 +361,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_KEYS,
-							NUM_ELEMENTS_PER_KEY,
-							NUM_ELEMENTS_PER_KEY / 3))
+					.addSource(new FailingSource(numKeys,
+							numElementsPerKey,
+							numElementsPerKey / 3))
 					.rebalance()
-					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS),
-							Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.timeWindowAll(Time.of(windowSize, MILLISECONDS),
+							Time.of(windowSlide, MILLISECONDS))
 					.reduce(
 							new ReduceFunction<Tuple2<Long, IntType>>() {
 
@@ -408,8 +405,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 							}
 						}
 					})
-					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -419,14 +415,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		}
 	}
 
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
 	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<Integer>, CheckpointListener
-	{
+			implements ListCheckpointed<Integer>, CheckpointListener {
 		private static volatile boolean failedBefore = false;
 
 		private final int numKeys;
@@ -467,8 +461,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 				}
 
 				if (numElementsEmitted < numElementsToEmit &&
-						(failedBefore || numElementsEmitted <= failureAfterNumElements))
-				{
+						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
 					// the function failed before, or we are in the elements before the failure
 					synchronized (ctx.getCheckpointLock()) {
 						int next = numElementsEmitted++;
@@ -579,7 +572,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
 
-
 			Integer curr = windowCounts.get(value.f0);
 			if (curr != null) {
 				windowCounts.put(value.f0, curr + 1);
@@ -625,12 +617,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Custom boxed integer type.
+	 */
 	public static class IntType {
 
 		public int value;
 
 		public IntType() {}
 
-		public IntType(int value) { this.value = value; }
+		public IntType(int value) {
+			this.value = value;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
index 65fda09..030c1a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.checkpointing;
 
+/**
+ * Integration tests for file backend.
+ */
 public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
 	public FileBackendEventTimeWindowCheckpointingITCase() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index 352f9f7..dfb66cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.checkpointing;
 
+/**
+ * Integration tests for incremental RocksDB backend.
+ */
 public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
 	public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index 147d385..76a18c3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -60,11 +60,11 @@ import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
- * 
- * The test triggers a failure after a while and verifies that, after
+ *
+ * <p>The test triggers a failure after a while and verifies that, after
  * completion, the state reflects the "exactly once" semantics.
- * 
- * It is designed to check partitioned states.
+ *
+ * <p>It is designed to check partitioned states.
  */
 @SuppressWarnings("serial")
 public class KeyedStateCheckpointingITCase extends TestLogger {
@@ -197,7 +197,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
 	 * A source that generates a sequence of integers and throttles down until a checkpoint
 	 * has happened.
 	 */
-	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> 
+	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
 		implements ListCheckpointed<Integer>, CheckpointListener {
 
 		private final int numElements;
@@ -363,7 +363,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
 		}
 	}
 
-	public static class IdentityKeySelector<T> implements KeySelector<T, T> {
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
 
 		@Override
 		public T getKey(T value) throws Exception {
@@ -375,6 +375,9 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
 	//  data types
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Custom boxed long type that does not implement Serializable.
+	 */
 	public static class NonSerializableLong {
 
 		public long value;
@@ -389,7 +392,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
 
 		@Override
 		public boolean equals(Object obj) {
-			return this == obj || 
+			return this == obj ||
 					obj != null && obj.getClass() == getClass() && ((NonSerializableLong) obj).value == this.value;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
index 899b8d6..54a29ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.checkpointing;
 
+/**
+ * Integration tests for memory backend.
+ */
 public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
 	public MemBackendEventTimeWindowCheckpointingITCase() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 264b22e..cad6693 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -55,7 +55,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import io.netty.util.internal.ConcurrentSet;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -85,6 +84,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Test savepoint rescaling.
+ */
 @RunWith(Parameterized.class)
 public class RescalingITCase extends TestLogger {
 
@@ -250,7 +252,6 @@ public class RescalingITCase extends TestLogger {
 
 			assertEquals(expectedResult2, actualResult2);
 
-
 		} finally {
 			// clear the CollectionSink set for the restarted job
 			CollectionSink.clearElementsSet();
@@ -502,7 +503,6 @@ public class RescalingITCase extends TestLogger {
 		testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.LIST_CHECKPOINTED);
 	}
 
-
 	/**
 	 * Tests rescaling of partitioned operator state. More specific, we test the mechanism with {@link ListCheckpointed}
 	 * as it subsumes {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}.
@@ -522,11 +522,11 @@ public class RescalingITCase extends TestLogger {
 
 		if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION ||
 				checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
-			PartitionedStateSource.CHECK_CORRECT_SNAPSHOT = new int[counterSize];
-			PartitionedStateSource.CHECK_CORRECT_RESTORE = new int[counterSize];
+			PartitionedStateSource.checkCorrectSnapshot = new int[counterSize];
+			PartitionedStateSource.checkCorrectRestore = new int[counterSize];
 		} else {
-			PartitionedStateSourceListCheckpointed.CHECK_CORRECT_SNAPSHOT = new int[counterSize];
-			PartitionedStateSourceListCheckpointed.CHECK_CORRECT_RESTORE = new int[counterSize];
+			PartitionedStateSourceListCheckpointed.checkCorrectSnapshot = new int[counterSize];
+			PartitionedStateSourceListCheckpointed.checkCorrectRestore = new int[counterSize];
 		}
 
 		try {
@@ -584,29 +584,29 @@ public class RescalingITCase extends TestLogger {
 			int sumAct = 0;
 
 			if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION) {
-				for (int c : PartitionedStateSource.CHECK_CORRECT_SNAPSHOT) {
+				for (int c : PartitionedStateSource.checkCorrectSnapshot) {
 					sumExp += c;
 				}
 
-				for (int c : PartitionedStateSource.CHECK_CORRECT_RESTORE) {
+				for (int c : PartitionedStateSource.checkCorrectRestore) {
 					sumAct += c;
 				}
 			} else if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
-				for (int c : PartitionedStateSource.CHECK_CORRECT_SNAPSHOT) {
+				for (int c : PartitionedStateSource.checkCorrectSnapshot) {
 					sumExp += c;
 				}
 
-				for (int c : PartitionedStateSource.CHECK_CORRECT_RESTORE) {
+				for (int c : PartitionedStateSource.checkCorrectRestore) {
 					sumAct += c;
 				}
 
 				sumExp *= parallelism2;
 			} else {
-				for (int c : PartitionedStateSourceListCheckpointed.CHECK_CORRECT_SNAPSHOT) {
+				for (int c : PartitionedStateSourceListCheckpointed.checkCorrectSnapshot) {
 					sumExp += c;
 				}
 
-				for (int c : PartitionedStateSourceListCheckpointed.CHECK_CORRECT_RESTORE) {
+				for (int c : PartitionedStateSourceListCheckpointed.checkCorrectRestore) {
 					sumAct += c;
 				}
 			}
@@ -777,8 +777,8 @@ public class RescalingITCase extends TestLogger {
 				if (counter < numberElements) {
 					synchronized (lock) {
 						for (int value = subtaskIndex;
-						     value < numberKeys;
-						     value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+							value < numberKeys;
+							value += getRuntimeContext().getNumberOfParallelSubtasks()) {
 
 							ctx.collect(value);
 						}
@@ -943,13 +943,13 @@ public class RescalingITCase extends TestLogger {
 		private static final long serialVersionUID = -4357864582992546L;
 		private static final int NUM_PARTITIONS = 7;
 
-		private static int[] CHECK_CORRECT_SNAPSHOT;
-		private static int[] CHECK_CORRECT_RESTORE;
+		private static int[] checkCorrectSnapshot;
+		private static int[] checkCorrectRestore;
 
 		@Override
 		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
 
-			CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+			checkCorrectSnapshot[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 
 			int div = counter / NUM_PARTITIONS;
 			int mod = counter % NUM_PARTITIONS;
@@ -971,7 +971,7 @@ public class RescalingITCase extends TestLogger {
 			for (Integer v : state) {
 				counter += v;
 			}
-			CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+			checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 		}
 	}
 
@@ -983,8 +983,8 @@ public class RescalingITCase extends TestLogger {
 		private transient ListState<Integer> counterPartitions;
 		private boolean broadcast;
 
-		private static int[] CHECK_CORRECT_SNAPSHOT;
-		private static int[] CHECK_CORRECT_RESTORE;
+		private static int[] checkCorrectSnapshot;
+		private static int[] checkCorrectRestore;
 
 		public PartitionedStateSource(boolean broadcast) {
 			this.broadcast = broadcast;
@@ -995,7 +995,7 @@ public class RescalingITCase extends TestLogger {
 
 			counterPartitions.clear();
 
-			CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+			checkCorrectSnapshot[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 
 			int div = counter / NUM_PARTITIONS;
 			int mod = counter % NUM_PARTITIONS;
@@ -1027,7 +1027,7 @@ public class RescalingITCase extends TestLogger {
 				for (int v : counterPartitions.get()) {
 					counter += v;
 				}
-				CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+				checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index da2bbc7..3873aff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.checkpointing;
 
+/**
+ * Integration tests for fully synchronous RocksDB backend.
+ */
 public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
 	public RocksDbBackendEventTimeWindowCheckpointingITCase() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 09dfa99..a3d45dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -78,17 +73,18 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -103,6 +99,12 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -479,7 +481,7 @@ public class SavepointITCase extends TestLogger {
 	/**
 	 * FLINK-5985
 	 *
-	 * This test ensures we can restore from a savepoint under modifications to the job graph that only concern
+	 * <p>This test ensures we can restore from a savepoint under modifications to the job graph that only concern
 	 * stateless operators.
 	 */
 	@Test
@@ -736,17 +738,17 @@ public class SavepointITCase extends TestLogger {
 	}
 
 	private static final int ITER_TEST_PARALLELISM = 1;
-	private static OneShotLatch[] ITER_TEST_SNAPSHOT_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
-	private static OneShotLatch[] ITER_TEST_RESTORE_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
-	private static int[] ITER_TEST_CHECKPOINT_VERIFY = new int[ITER_TEST_PARALLELISM];
+	private static OneShotLatch[] iterTestSnapshotWait = new OneShotLatch[ITER_TEST_PARALLELISM];
+	private static OneShotLatch[] iterTestRestoreWait = new OneShotLatch[ITER_TEST_PARALLELISM];
+	private static int[] iterTestCheckpointVerify = new int[ITER_TEST_PARALLELISM];
 
 	@Test
 	public void testSavepointForJobWithIteration() throws Exception {
 
 		for (int i = 0; i < ITER_TEST_PARALLELISM; ++i) {
-			ITER_TEST_SNAPSHOT_WAIT[i] = new OneShotLatch();
-			ITER_TEST_RESTORE_WAIT[i] = new OneShotLatch();
-			ITER_TEST_CHECKPOINT_VERIFY[i] = 0;
+			iterTestSnapshotWait[i] = new OneShotLatch();
+			iterTestRestoreWait[i] = new OneShotLatch();
+			iterTestCheckpointVerify[i] = 0;
 		}
 
 		TemporaryFolder folder = new TemporaryFolder();
@@ -821,7 +823,7 @@ public class SavepointITCase extends TestLogger {
 			cluster.start();
 
 			cluster.submitJobDetached(jobGraph);
-			for (OneShotLatch latch : ITER_TEST_SNAPSHOT_WAIT) {
+			for (OneShotLatch latch : iterTestSnapshotWait) {
 				latch.await();
 			}
 			savepointPath = cluster.triggerSavepoint(jobGraph.getJobID());
@@ -831,7 +833,7 @@ public class SavepointITCase extends TestLogger {
 			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 			cluster.submitJobDetached(jobGraph);
-			for (OneShotLatch latch : ITER_TEST_RESTORE_WAIT) {
+			for (OneShotLatch latch : iterTestRestoreWait) {
 				latch.await();
 			}
 			source.cancel();
@@ -883,7 +885,7 @@ public class SavepointITCase extends TestLogger {
 
 		@Override
 		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-			ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()] = emittedCount;
+			iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()] = emittedCount;
 			return Collections.singletonList(emittedCount);
 		}
 
@@ -892,20 +894,20 @@ public class SavepointITCase extends TestLogger {
 			if (!state.isEmpty()) {
 				this.emittedCount = state.get(0);
 			}
-			Assert.assertEquals(ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()], emittedCount);
-			ITER_TEST_RESTORE_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
+			Assert.assertEquals(iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()], emittedCount);
+			iterTestRestoreWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
 		}
 	}
 
-	public static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
+	private static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
 
-		static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
+		static final ValueStateDescriptor<Boolean> DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class, false);
 		private static final long serialVersionUID = 1L;
 		private ValueState<Boolean> operatorState;
 
 		@Override
 		public void open(Configuration configuration) {
-			operatorState = this.getRuntimeContext().getState(descriptor);
+			operatorState = this.getRuntimeContext().getState(DESCRIPTOR);
 		}
 
 		@Override
@@ -916,7 +918,7 @@ public class SavepointITCase extends TestLogger {
 			}
 
 			if (30 == value) {
-				ITER_TEST_SNAPSHOT_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
+				iterTestSnapshotWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 32d9e23..0fcfb8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,11 +47,11 @@ import static org.junit.Assert.assertTrue;
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
  *
- * The test triggers a failure after a while and verifies that, after completion, the
+ * <p>The test triggers a failure after a while and verifies that, after completion, the
  * state defined with either the {@link ValueState} or the {@link ListCheckpointed}
  * interface reflects the "exactly once" semantics.
- * 
- * The test throttles the input until at least two checkpoints are completed, to make sure that
+ *
+ * <p>The test throttles the input until at least two checkpoints are completed, to make sure that
  * the recovery does not fall back to "square one" (which would naturally lead to correct
  * results without testing the checkpointing).
  */
@@ -59,11 +60,10 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StateCheckpointedITCase.class);
 
-	final long NUM_STRINGS = 10_000_000L;
+	static final long NUM_STRINGS = 10_000_000L;
 
 	/**
-	 * Runs the following program:
-	 *
+	 * Runs the following program.
 	 * <pre>
 	 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
@@ -84,7 +84,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		stream
 				// first vertex, chained to the source
 				// this filter throttles the flow until at least one checkpoint
-				// is complete, to make sure this program does not run without 
+				// is complete, to make sure this program does not run without
 				.filter(new StringRichFilterFunction())
 
 						// -------------- seconds vertex - one-to-one connected ----------------
@@ -100,13 +100,13 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 
 	@Override
 	public void postSubmit() {
-		
+
 		//assertTrue("Test inconclusive: failure occurred before first checkpoint",
 		//		OnceFailingAggregator.wasCheckpointedBeforeFailure);
-		if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
+		if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
 			LOG.warn("Test inconclusive: failure occurred before first checkpoint");
 		}
-		
+
 		long filterSum = 0;
 		for (long l : StringRichFilterFunction.counts) {
 			filterSum += l;
@@ -137,10 +137,9 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 	// --------------------------------------------------------------------------------------------
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
-	
-	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> 
-			implements ListCheckpointed<Integer>
-	{
+
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements ListCheckpointed<Integer> {
 		private final long numElements;
 
 		private int index;
@@ -157,9 +156,9 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 
 			final Random rnd = new Random();
 			final StringBuilder stringBuilder = new StringBuilder();
-			
+
 			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-			
+
 			if (index == 0) {
 				index = getRuntimeContext().getIndexOfThisSubtask();
 			}
@@ -178,7 +177,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 				}
 			}
 		}
-		
+
 		@Override
 		public void cancel() {
 			isRunning = false;
@@ -209,11 +208,11 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 	}
 
-	private static class StringRichFilterFunction extends RichFilterFunction<String> 
+	private static class StringRichFilterFunction extends RichFilterFunction<String>
 			implements ListCheckpointed<Long> {
 
-		static final long[] counts = new long[PARALLELISM];
-		
+		static long[] counts = new long[PARALLELISM];
+
 		private long count;
 
 		@Override
@@ -241,10 +240,10 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 	}
 
-	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> 
+	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
 			implements ListCheckpointed<Long> {
-		
-		static final long[] counts = new long[PARALLELISM];
+
+		static long[] counts = new long[PARALLELISM];
 
 		private long count;
 
@@ -272,12 +271,12 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 			this.count = state.get(0);
 		}
 	}
-	
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
+
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
 		implements ListCheckpointed<Long> {
 
-		static final long[] counts = new long[PARALLELISM];
-		
+		static long[] counts = new long[PARALLELISM];
+
 		private long count;
 
 		@Override
@@ -304,25 +303,25 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 			this.count = state.get(0);
 		}
 	}
-	
-	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 
+
+	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
 		implements ListCheckpointed<HashMap<String, PrefixCount>>, CheckpointListener {
 
 		static boolean wasCheckpointedBeforeFailure = false;
-		
+
 		private static volatile boolean hasFailed = false;
 
 		private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
-		
+
 		private long failurePos;
 		private long count;
-		
+
 		private boolean wasCheckpointed;
 
 		OnceFailingAggregator(long failurePos) {
 			this.failurePos = failurePos;
 		}
-		
+
 		@Override
 		public void open(Configuration parameters) {
 			count = 0;
@@ -336,7 +335,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 				hasFailed = true;
 				throw new Exception("Test Failure");
 			}
-			
+
 			PrefixCount curr = aggregationMap.get(value.prefix);
 			if (curr == null) {
 				aggregationMap.put(value.prefix, value);
@@ -367,12 +366,12 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 	}
 
-	private static class ValidatingSink extends RichSinkFunction<PrefixCount> 
+	private static class ValidatingSink extends RichSinkFunction<PrefixCount>
 			implements ListCheckpointed<HashMap<Character, Long>> {
 
 		@SuppressWarnings("unchecked")
 		private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
-		
+
 		private HashMap<Character, Long> counts = new HashMap<Character, Long>();
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index d76d674..16d8b54 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,12 +61,10 @@ import static org.junit.Assert.fail;
  * checkpoints, that it is called at most once for any checkpoint id and that it is not
  * called for a deliberately failed checkpoint.
  *
- * <p>
- * The topology tested here includes a number of {@link OneInputStreamOperator}s and a
+ * <p>The topology tested here includes a number of {@link OneInputStreamOperator}s and a
  * {@link TwoInputStreamOperator}.
  *
- * <p>
- * Note that as a result of doing the checks on the task level there is no way to verify
+ * <p>Note that as a result of doing the checks on the task level there is no way to verify
  * that the {@link CheckpointListener#notifyCheckpointComplete(long)} is called for every
  * successfully completed checkpoint.
  */
@@ -79,8 +76,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 	private static final int PARALLELISM = 4;
 
 	/**
-	 * Runs the following program:
-	 *
+	 * Runs the following program.
 	 * <pre>
 	 *     [ (source)->(filter) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
@@ -95,52 +91,52 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
 
 			final int numElements = 10000;
-			final int numTaskTotal = PARALLELISM * 5; 
+			final int numTaskTotal = PARALLELISM * 5;
 
 			DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));
 
 			stream
 					// -------------- first vertex, chained to the src ----------------
 					.filter(new LongRichFilterFunction())
-	
+
 					// -------------- second vertex, applying the co-map ----------------
 					.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
-	
+
 					// -------------- third vertex - the stateful one that also fails ----------------
 					.map(new IdentityMapFunction())
 					.startNewChain()
-	
+
 					// -------------- fourth vertex - reducer and the sink ----------------
 					.keyBy(0)
 					.reduce(new OnceFailingReducer(numElements))
-				
+
 					.addSink(new DiscardingSink<Tuple1<Long>>());
-			
+
 			env.execute();
 
 			final long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
 			assertNotEquals(0L, failureCheckpointID);
-			
+
 			List<List<Long>[]> allLists = Arrays.asList(
-				GeneratingSourceFunction.completedCheckpoints,
-				LongRichFilterFunction.completedCheckpoints,
-				LeftIdentityCoRichFlatMapFunction.completedCheckpoints,
-				IdentityMapFunction.completedCheckpoints,
-				OnceFailingReducer.completedCheckpoints
+				GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
+				LongRichFilterFunction.COMPLETED_CHECKPOINTS,
+				LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
+				IdentityMapFunction.COMPLETED_CHECKPOINTS,
+				OnceFailingReducer.COMPLETED_CHECKPOINTS
 			);
 
 			for (List<Long>[] parallelNotifications : allLists) {
 				for (List<Long> notifications : parallelNotifications) {
-					
-					assertTrue("No checkpoint notification was received.", 
+
+					assertTrue("No checkpoint notification was received.",
 						notifications.size() > 0);
-					
+
 					assertFalse("Failure checkpoint was marked as completed.",
 						notifications.contains(failureCheckpointID));
-					
+
 					assertFalse("No checkpoint received after failure.",
 						notifications.get(notifications.size() - 1) == failureCheckpointID);
-					
+
 					assertTrue("Checkpoint notification was received multiple times",
 						notifications.size() == new HashSet<Long>(notifications).size());
 				}
@@ -160,7 +156,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 		}
 		return lists;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
@@ -171,21 +167,21 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 	 */
 	private static class GeneratingSourceFunction extends RichSourceFunction<Long>
 			implements ParallelSourceFunction<Long>, CheckpointListener, ListCheckpointed<Integer> {
-		
-		static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
-		
+
+		static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
+
 		static AtomicLong numPostFailureNotifications = new AtomicLong();
 
 		// operator behaviour
 		private final long numElements;
-		
+
 		private final int notificationsToWaitFor;
 
 		private int index;
 		private int step;
 
 		private volatile boolean notificationAlready;
-		
+
 		private volatile boolean isRunning = true;
 
 		GeneratingSourceFunction(long numElements, int notificationsToWaitFor) {
@@ -198,8 +194,9 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
 
 			// if index has been restored, it is not 0 any more
-			if (index == 0)
+			if (index == 0) {
 				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
 		}
 
 		@Override
@@ -214,7 +211,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 					ctx.collect(result);
 				}
 			}
-			
+
 			// if the program goes fast and no notifications come through, we
 			// wait until all tasks had a chance to see a notification
 			while (isRunning && numPostFailureNotifications.get() < notificationsToWaitFor) {
@@ -244,7 +241,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 		public void notifyCheckpointComplete(long checkpointId) {
 			// record the ID of the completed checkpoint
 			int partition = getRuntimeContext().getIndexOfThisSubtask();
-			completedCheckpoints[partition].add(checkpointId);
+			COMPLETED_CHECKPOINTS[partition].add(checkpointId);
 
 			// if this is the first time we get a notification since the failure,
 			// tell the source function
@@ -262,7 +259,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 	private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>>
 			implements CheckpointListener {
 
-		static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
+		static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
 
 		private volatile boolean notificationAlready;
 
@@ -275,7 +272,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 		public void notifyCheckpointComplete(long checkpointId) {
 			// record the ID of the completed checkpoint
 			int partition = getRuntimeContext().getIndexOfThisSubtask();
-			completedCheckpoints[partition].add(checkpointId);
+			COMPLETED_CHECKPOINTS[partition].add(checkpointId);
 
 			// if this is the first time we get a notification since the failure,
 			// tell the source function
@@ -293,10 +290,10 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 	 */
 	private static class LongRichFilterFunction extends RichFilterFunction<Long> implements CheckpointListener {
 
-		static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
-		
+		static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
+
 		private volatile boolean notificationAlready;
-		
+
 		@Override
 		public boolean filter(Long value) {
 			return value < 100;
@@ -306,8 +303,8 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 		public void notifyCheckpointComplete(long checkpointId) {
 			// record the ID of the completed checkpoint
 			int partition = getRuntimeContext().getIndexOfThisSubtask();
-			completedCheckpoints[partition].add(checkpointId);
-			
+			COMPLETED_CHECKPOINTS[partition].add(checkpointId);
+
 			// if this is the first time we get a notification since the failure,
 			// tell the source function
 			if (OnceFailingReducer.hasFailed && !notificationAlready) {
@@ -325,10 +322,10 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long>
 			implements CheckpointListener {
 
-		static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
+		static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
 
 		private volatile boolean notificationAlready;
-		
+
 		@Override
 		public void flatMap1(Long value, Collector<Long> out) {
 			out.collect(value);
@@ -343,7 +340,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 		public void notifyCheckpointComplete(long checkpointId) {
 			// record the ID of the completed checkpoint
 			int partition = getRuntimeContext().getIndexOfThisSubtask();
-			completedCheckpoints[partition].add(checkpointId);
+			COMPLETED_CHECKPOINTS[partition].add(checkpointId);
 
 			// if this is the first time we get a notification since the failure,
 			// tell the source function
@@ -357,16 +354,15 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 	/**
 	 * Reducer that causes one failure between seeing 40% to 70% of the records.
 	 */
-	private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> 
-		implements ListCheckpointed<Long>, CheckpointListener
-	{
+	private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>>
+		implements ListCheckpointed<Long>, CheckpointListener {
 		static volatile boolean hasFailed = false;
 		static volatile long failureCheckpointID;
 
-		static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
-		
+		static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
+
 		private final long failurePos;
-		
+
 		private volatile long count;
 
 		private volatile boolean notificationAlready;
@@ -381,7 +377,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 			if (count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) {
 				LOG.info(">>>>>>>>>>>>>>>>> Reached failing position <<<<<<<<<<<<<<<<<<<<<");
 			}
-			
+
 			value1.f0 += value2.f0;
 			return value1;
 		}
@@ -409,7 +405,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
 		public void notifyCheckpointComplete(long checkpointId) {
 			// record the ID of the completed checkpoint
 			int partition = getRuntimeContext().getIndexOfThisSubtask();
-			completedCheckpoints[partition].add(checkpointId);
+			COMPLETED_CHECKPOINTS[partition].add(checkpointId);
 
 			// if this is the first time we get a notification since the failure,
 			// tell the source function

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index aae04c9..616e794 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -41,18 +41,17 @@ import static org.junit.Assert.assertEquals;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
- * 
- * The test triggers a failure after a while and verifies that, after completion, the
+ *
+ * <p>The test triggers a failure after a while and verifies that, after completion, the
  * state defined with the {@link ListCheckpointed} interface reflects the "exactly once" semantics.
  */
 @SuppressWarnings("serial")
 public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
-	final long NUM_STRINGS = 10_000_000L;
+	static final long NUM_STRINGS = 10_000_000L;
 
 	/**
-	 * Runs the following program:
-	 *
+	 * Runs the following program.
 	 * <pre>
 	 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 	 * </pre>
@@ -101,7 +100,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		}
 
 		long reduceInputCount = 0;
-		for(long l: OnceFailingPrefixCounter.counts){
+		for (long l: OnceFailingPrefixCounter.counts){
 			reduceInputCount += l;
 		}
 
@@ -118,12 +117,12 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	// --------------------------------------------------------------------------------------------
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
 			implements ParallelSourceFunction<String>, ListCheckpointed<Integer> {
 
 		private final long numElements;
-		
+
 		private final Random rnd = new Random();
 		private final StringBuilder stringBuilder = new StringBuilder();
 
@@ -132,14 +131,13 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 		private volatile boolean isRunning = true;
 
-		static final long[] counts = new long[PARALLELISM];
+		static long[] counts = new long[PARALLELISM];
 
 		@Override
 		public void close() throws IOException {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
 		}
 
-
 		StringGeneratingSourceFunction(long numElements) {
 			this.numElements = numElements;
 		}
@@ -200,11 +198,11 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			this.index = state.get(0);
 		}
 	}
-	
+
 	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements ListCheckpointed<Long> {
 
 		private long count;
-		static final long[] counts = new long[PARALLELISM];
+		static long[] counts = new long[PARALLELISM];
 
 		@Override
 		public PrefixCount map(PrefixCount value) throws Exception {
@@ -234,16 +232,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	/**
 	 * This function uses simultaneously the key/value state and is checkpointed.
 	 */
-	private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> 
+	private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount>
 			implements ListCheckpointed<Long> {
-		
+
 		private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>();
-		static final long[] counts = new long[PARALLELISM];
+		static long[] counts = new long[PARALLELISM];
 
 		private static volatile boolean hasFailed = false;
 
 		private final long numElements;
-		
+
 		private long failurePos;
 		private long count;
 
@@ -253,7 +251,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		OnceFailingPrefixCounter(long numElements) {
 			this.numElements = numElements;
 		}
-		
+
 		@Override
 		public void open(Configuration parameters) throws IOException {
 			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
@@ -261,10 +259,10 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
-			
+
 			pCount = getRuntimeContext().getState(new ValueStateDescriptor<>("pCount", Long.class, 0L));
 		}
-		
+
 		@Override
 		public void close() throws IOException {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = inputCount;
@@ -278,7 +276,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 				throw new Exception("Test Failure");
 			}
 			inputCount++;
-		
+
 			long currentPrefixCount = pCount.value() + value.count;
 			pCount.update(currentPrefixCount);
 			prefixCounts.put(value.prefix, currentPrefixCount);
@@ -301,8 +299,8 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	}
 
 	private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
-		
-		static final long[] counts = new long[PARALLELISM];
+
+		static long[] counts = new long[PARALLELISM];
 
 		private long count;
 
@@ -333,8 +331,8 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
 			implements ListCheckpointed<Long> {
-		
-		static final long[] counts = new long[PARALLELISM];
+
+		static long[] counts = new long[PARALLELISM];
 
 		private long count;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 5f56def..5d902ff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -38,7 +38,7 @@ import java.io.Serializable;
 import static org.junit.Assert.fail;
 
 /**
- * Test base for fault tolerant streaming programs
+ * Test base for fault tolerant streaming programs.
  */
 public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
@@ -55,7 +55,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-			
+
 			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
@@ -82,12 +82,12 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	 * Implementations are expected to assemble the test topology in this function
 	 * using the provided {@link StreamExecutionEnvironment}.
 	 */
-	abstract public void testProgram(StreamExecutionEnvironment env);
+	public abstract void testProgram(StreamExecutionEnvironment env);
 
 	/**
 	 * Implementations are expected to provide test here to verify the correct behavior.
 	 */
-	abstract public void postSubmit() throws Exception ;
+	public abstract void postSubmit() throws Exception;
 
 	/**
 	 * Runs the following program the test program defined in {@link #testProgram(StreamExecutionEnvironment)}
@@ -118,6 +118,9 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	//  Frequently used utilities
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * POJO storing prefix, value, and count.
+	 */
 	@SuppressWarnings("serial")
 	public static class PrefixCount implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index 0a89ab9..1cf5829 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -15,10 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,6 +29,9 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
 
+/**
+ * Test the {@link TimestampedFileInputSplit} for Continuous File Processing.
+ */
 public class TimestampedFileInputSplitTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index a219b68..f19d690 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import com.google.common.collect.EvictingQueue;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -33,6 +32,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+
+import com.google.common.collect.EvictingQueue;
 import org.junit.Assert;
 
 import java.util.Collections;
@@ -45,15 +46,14 @@ import java.util.Random;
  * of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from
  * a failure.
  *
- * <p>
- * The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
+ * <p>The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
  * and the {@link StreamGroupedFold} operators.
  */
 @SuppressWarnings("serial")
 public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
 
-	final private static long NUM_INPUT = 500_000L;
-	final private static int NUM_OUTPUT = 1_000;
+	private static final long NUM_INPUT = 500_000L;
+	private static final int NUM_OUTPUT = 1_000;
 
 	/**
 	 * Assembles a stream of a grouping field and some long data. Applies reduce functions
@@ -66,7 +66,6 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 		KeyedStream<Tuple2<Integer, Long>, Tuple> stream = env.addSource(new StatefulMultipleSequence())
 				.keyBy(0);
 
-
 		stream
 				// testing built-in aggregate
 				.min(1)
@@ -184,7 +183,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 	 * Mapper that causes one failure between seeing 40% to 70% of the records.
 	 */
 	private static class OnceFailingIdentityMapFunction
-			extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> 
+			extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>
 			implements ListCheckpointed<Long> {
 
 		private static volatile boolean hasFailed = false;
@@ -223,7 +222,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 
 		@Override
 		public void restoreState(List<Long> state) throws Exception {
-			if(!state.isEmpty()) {
+			if (!state.isEmpty()) {
 				count = state.get(0);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 7004f75..7ec4f86 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -27,8 +27,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -52,7 +53,6 @@ import java.util.HashMap;
 import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -78,7 +78,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	private static TestStreamEnvironment env;
 
-
 	@BeforeClass
 	public static void startTestCluster() {
 		Configuration config = new Configuration();
@@ -103,9 +102,9 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testTumblingProcessingTimeWindow() {
-		final int NUM_ELEMENTS = 3000;
+		final int numElements = 3000;
 		FailingSource.reset();
-		
+
 		try {
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(timeCharacteristic);
@@ -115,7 +114,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
+					.addSource(new FailingSource(numElements, numElements / 3))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(100, MILLISECONDS))
@@ -145,8 +144,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 							}
 						}
 					})
-					.addSink(new ValidatingSink(NUM_ELEMENTS, 1)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -158,7 +156,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testSlidingProcessingTimeWindow() {
-		final int NUM_ELEMENTS = 3000;
+		final int numElements = 3000;
 		FailingSource.reset();
 
 		try {
@@ -170,7 +168,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
+					.addSource(new FailingSource(numElements, numElements / 3))
 					.rebalance()
 					.keyBy(0)
 					.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
@@ -200,8 +198,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 							}
 						}
 					})
-					.addSink(new ValidatingSink(NUM_ELEMENTS, 3)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -213,7 +210,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testAggregatingTumblingProcessingTimeWindow() {
-		final int NUM_ELEMENTS = 3000;
+		final int numElements = 3000;
 		FailingSource.reset();
 
 		try {
@@ -225,8 +222,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
-					.map(new MapFunction<Tuple2<Long,IntType>, Tuple2<Long,IntType>>() {
+					.addSource(new FailingSource(numElements, numElements / 3))
+					.map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
 						@Override
 						public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
 							value.f1.value = 1;
@@ -245,8 +242,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 							return new Tuple2<>(a.f0, new IntType(1));
 						}
 					})
-					.addSink(new ValidatingSink(NUM_ELEMENTS, 1)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -258,7 +254,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	@Test
 	public void testAggregatingSlidingProcessingTimeWindow() {
-		final int NUM_ELEMENTS = 3000;
+		final int numElements = 3000;
 		FailingSource.reset();
 
 		try {
@@ -270,8 +266,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 			env.getConfig().disableSysoutLogging();
 
 			env
-					.addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
-					.map(new MapFunction<Tuple2<Long,IntType>, Tuple2<Long,IntType>>() {
+					.addSource(new FailingSource(numElements, numElements / 3))
+					.map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
 						@Override
 						public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
 							value.f1.value = 1;
@@ -289,8 +285,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 							return new Tuple2<>(a.f0, new IntType(1));
 						}
 					})
-					.addSink(new ValidatingSink(NUM_ELEMENTS, 3)).setParallelism(1);
-
+					.addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
 
 			tryExecute(env, "Tumbling Window Test");
 		}
@@ -305,8 +300,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<Integer>, CheckpointListener
-	{
+			implements ListCheckpointed<Integer>, CheckpointListener {
 		private static volatile boolean failedBefore = false;
 
 		private final int numElementsToEmit;
@@ -470,12 +464,17 @@ public class WindowCheckpointingITCase extends TestLogger {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	/**
+	 * POJO with int value.
+	 */
 	public static class IntType {
 
 		public int value;
 
 		public IntType() {}
 
-		public IntType(int value) { this.value = value; }
+		public IntType(int value) {
+			this.value = value;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index e4004c7..21be7ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -18,21 +18,14 @@
 
 package org.apache.flink.test.checkpointing.utils;
 
-import java.io.File;
-import java.net.URI;
-import java.net.URL;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -42,12 +35,21 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
+
+import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -57,6 +59,9 @@ import scala.concurrent.duration.FiniteDuration;
 
 import static junit.framework.Assert.fail;
 
+/**
+ * Test savepoint migration.
+ */
 public class SavepointMigrationTestBase extends TestBaseUtils {
 
 	@Rule
@@ -120,7 +125,6 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		// Submit the job
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-
 		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
 
 		LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
index 4221670..da6e035 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -133,11 +134,10 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
 	}
 
-
 	@Test
 	public void testSavepointRestoreFromFlink11() throws Exception {
 
-		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+		final int expectedSuccessfulChecks = 21;
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -165,13 +165,13 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		restoreAndExecute(
 				env,
 				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
-				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, expectedSuccessfulChecks));
 	}
 
 	@Test
 	public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
 
-		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+		final int expectedSuccessfulChecks = 21;
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -199,13 +199,13 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		restoreAndExecute(
 				env,
 				getResourceFilename("stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint"),
-				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, expectedSuccessfulChecks));
 	}
 
 	private static class LegacyCheckpointedSource
 			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
 
-		public static String CHECKPOINTED_STRING = "Here be dragons!";
+		public static String checkpointedString = "Here be dragons!";
 
 		private static final long serialVersionUID = 1L;
 
@@ -237,12 +237,12 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 
 		@Override
 		public void restoreState(String state) throws Exception {
-			assertEquals(CHECKPOINTED_STRING, state);
+			assertEquals(checkpointedString, state);
 		}
 
 		@Override
 		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_STRING;
+			return checkpointedString;
 		}
 	}
 
@@ -271,7 +271,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 
 		@Override
 		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+			assertEquals(LegacyCheckpointedSource.checkpointedString, restoredState);
 			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
 
 			synchronized (ctx.getCheckpointLock()) {
@@ -296,12 +296,12 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+	private static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements Checkpointed<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
-		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+		public static Tuple2<String, Long> checkpointedTuple =
 				new Tuple2<>("hello", 42L);
 
 		@Override
@@ -315,11 +315,11 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 
 		@Override
 		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_TUPLE;
+			return checkpointedTuple;
 		}
 	}
 
-	public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+	private static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements CheckpointedRestoring<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
@@ -337,7 +337,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
 			out.collect(value);
 
-			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
 			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
 
 		}
@@ -348,13 +348,13 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class LegacyCheckpointedFlatMapWithKeyedState
+	private static class LegacyCheckpointedFlatMapWithKeyedState
 			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements Checkpointed<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
-		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+		public static Tuple2<String, Long> checkpointedTuple =
 				new Tuple2<>("hello", 42L);
 
 		private final ValueStateDescriptor<Long> stateDescriptor =
@@ -373,11 +373,11 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 
 		@Override
 		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_TUPLE;
+			return checkpointedTuple;
 		}
 	}
 
-	public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+	private static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements CheckpointedRestoring<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
@@ -404,7 +404,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 			}
 
 			assertEquals(value.f1, state.value());
-			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
 			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
 		}
 
@@ -414,7 +414,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	private static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -429,7 +429,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	private static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -457,7 +457,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class CheckpointedUdfOperator
+	private static class CheckpointedUdfOperator
 			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
 			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		private static final long serialVersionUID = 1L;
@@ -488,7 +488,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 //					checkpointId,
 //					timestamp);
 //
-//			out.writeUTF(CHECKPOINTED_STRING);
+//			out.writeUTF(checkpointedString);
 //
 //			result.setOperatorState(out.closeAndGetHandle());
 //
@@ -496,7 +496,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 //		}
 	}
 
-	public static class RestoringCheckingUdfOperator
+	private static class RestoringCheckingUdfOperator
 			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
 			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		private static final long serialVersionUID = 1L;
@@ -535,7 +535,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+	private static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
 		private static final long serialVersionUID = 1L;
 
 		private final String accumulatorName;