You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/22 21:50:24 UTC

[05/10] flink git commit: [FLINK-6603] [streaming] Enable checkstyle on test sources

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
index ce85b6a..71706d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
@@ -26,9 +24,12 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * Tests for {@link StreamMap}. These test that:
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 8a5c997..50dc4d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -39,12 +39,16 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.InputStream;
 import java.util.BitSet;
 
+/**
+ * Tests for {@link StreamOperator} snapshot restoration.
+ */
 public class StreamOperatorSnapshotRestoreTest {
 
 	private static final int MAX_PARALLELISM = 10;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
index 28cdf9d..2bae429 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -34,6 +32,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 
 import org.junit.Test;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * Tests for {@link StreamProject}. These test that:
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
index 3695120..d2cf2e6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.CollectorOutput;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -39,19 +40,22 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link StreamSource} awareness of source idleness.
+ */
 @RunWith(Parameterized.class)
 public class StreamSourceContextIdleDetectionTests {
 
 	/** The tests in this class will be parameterized with these enumerations.*/
 	private enum TestMethod {
 
-		/** test idleness detection using the {@link SourceFunction.SourceContext#collect(Object)} method */
+		// test idleness detection using the {@link SourceFunction.SourceContext#collect(Object)} method
 		COLLECT,
 
-		/** test idleness detection using the {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method */
+		// test idleness detection using the {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method
 		COLLECT_WITH_TIMESTAMP,
 
-		/** test idleness detection using the {@link SourceFunction.SourceContext#emitWatermark(Watermark)} method */
+		// test idleness detection using the {@link SourceFunction.SourceContext#emitWatermark(Watermark)} method
 		EMIT_WATERMARK
 	}
 
@@ -73,7 +77,7 @@ public class StreamSourceContextIdleDetectionTests {
 	 * (7) Advance time to 510 and trigger idleness detection. Since no records were collected in-between the two
 	 *     idleness detections, status should have been toggle back to IDLE.
 	 *
-	 * Inline comments will refer to the corresponding tested steps in the scenario.
+	 * <p>Inline comments will refer to the corresponding tested steps in the scenario.
 	 */
 	@Test
 	public void testManualWatermarkContext() throws Exception {
@@ -103,12 +107,12 @@ public class StreamSourceContextIdleDetectionTests {
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
 
 		// corresponds to step (3) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 2*idleTimeout);
-		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout);
+		processingTimeService.setCurrentTime(initialTime + 2 * idleTimeout);
+		processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout);
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
 
 		// corresponds to step (4) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10);
+		processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + idleTimeout / 10);
 		switch (testMethod) {
 			case COLLECT:
 				context.collect("msg");
@@ -123,7 +127,7 @@ public class StreamSourceContextIdleDetectionTests {
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
 
 		// corresponds to step (5) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 2*idleTimeout/10);
+		processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + 2 * idleTimeout / 10);
 		switch (testMethod) {
 			case COLLECT:
 				context.collect("msg");
@@ -138,11 +142,11 @@ public class StreamSourceContextIdleDetectionTests {
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
 
 		// corresponds to step (6) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10);
+		processingTimeService.setCurrentTime(initialTime + 4 * idleTimeout + idleTimeout / 10);
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
 
 		// corresponds to step (7) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 5*idleTimeout + idleTimeout/10);
+		processingTimeService.setCurrentTime(initialTime + 5 * idleTimeout + idleTimeout / 10);
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
 	}
 
@@ -159,7 +163,7 @@ public class StreamSourceContextIdleDetectionTests {
 	 *     should have been "piggy-backed" in the task, allowing the status to be toggled to IDLE before the next
 	 *     actual idle detection task at 530.
 	 *
-	 * Inline comments will refer to the corresponding tested steps in the scenario.
+	 * <p>Inline comments will refer to the corresponding tested steps in the scenario.
 	 */
 	@Test
 	public void testAutomaticWatermarkContext() throws Exception {
@@ -189,24 +193,24 @@ public class StreamSourceContextIdleDetectionTests {
 		// corresponds to step (2) of scenario (please see method-level Javadoc comment)
 		processingTimeService.setCurrentTime(initialTime + watermarkInterval);
 		expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
-		processingTimeService.setCurrentTime(initialTime + 2*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 2 * watermarkInterval);
 		expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
 		processingTimeService.setCurrentTime(initialTime + idleTimeout);
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
 		assertEquals(expectedOutput, output);
 
 		// corresponds to step (3) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 3*watermarkInterval);
-		processingTimeService.setCurrentTime(initialTime + 4*watermarkInterval);
-		processingTimeService.setCurrentTime(initialTime + 2*idleTimeout);
-		processingTimeService.setCurrentTime(initialTime + 6*watermarkInterval);
-		processingTimeService.setCurrentTime(initialTime + 7*watermarkInterval);
-		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout);
+		processingTimeService.setCurrentTime(initialTime + 3 * watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 4 * watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 2 * idleTimeout);
+		processingTimeService.setCurrentTime(initialTime + 6 * watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 7 * watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout);
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
 		assertEquals(expectedOutput, output);
 
 		// corresponds to step (4) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10);
+		processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + idleTimeout / 10);
 		switch (testMethod) {
 			case COLLECT:
 				context.collect("msg");
@@ -232,8 +236,8 @@ public class StreamSourceContextIdleDetectionTests {
 		}
 
 		// corresponds to step (5) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 8*watermarkInterval);
-		processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 3*idleTimeout/10);
+		processingTimeService.setCurrentTime(initialTime + 8 * watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + 3 * idleTimeout / 10);
 		switch (testMethod) {
 			case COLLECT:
 				context.collect("msg");
@@ -266,7 +270,7 @@ public class StreamSourceContextIdleDetectionTests {
 				assertEquals(expectedOutput, output);
 		}
 
-		processingTimeService.setCurrentTime(initialTime + 10*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 10 * watermarkInterval);
 		switch (testMethod) {
 			case COLLECT:
 			case COLLECT_WITH_TIMESTAMP:
@@ -280,7 +284,7 @@ public class StreamSourceContextIdleDetectionTests {
 		}
 
 		// corresponds to step (6) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10);
+		processingTimeService.setCurrentTime(initialTime + 4 * idleTimeout + idleTimeout / 10);
 		switch (testMethod) {
 			case COLLECT:
 			case COLLECT_WITH_TIMESTAMP:
@@ -293,7 +297,7 @@ public class StreamSourceContextIdleDetectionTests {
 		}
 
 		// corresponds to step (7) of scenario (please see method-level Javadoc comment)
-		processingTimeService.setCurrentTime(initialTime + 11*watermarkInterval);
+		processingTimeService.setCurrentTime(initialTime + 11 * watermarkInterval);
 		assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
 		assertEquals(expectedOutput, output);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 5ca9cb4..2e24f4c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -33,8 +33,8 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
@@ -66,6 +67,9 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link StreamingRuntimeContext}.
+ */
 public class StreamingRuntimeContextTest {
 
 	@Test
@@ -245,7 +249,7 @@ public class StreamingRuntimeContextTest {
 
 		AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class);
 
-		KeyedStateBackend keyedStateBackend= mock(KeyedStateBackend.class);
+		KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class);
 
 		DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config);
 
@@ -271,7 +275,7 @@ public class StreamingRuntimeContextTest {
 		AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class);
 		ExecutionConfig config = new ExecutionConfig();
 
-		KeyedStateBackend keyedStateBackend= mock(KeyedStateBackend.class);
+		KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class);
 
 		DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config);
 
@@ -307,7 +311,7 @@ public class StreamingRuntimeContextTest {
 		AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class);
 		ExecutionConfig config = new ExecutionConfig();
 
-		KeyedStateBackend keyedStateBackend= mock(KeyedStateBackend.class);
+		KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class);
 
 		DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
index a03a4c5..f8b095c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index b675cc5..6c93894 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -38,9 +39,11 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+/**
+ * Test snapshot state with {@link WrappingFunction}.
+ */
 public class WrappingFunctionSnapshotRestoreTest {
 
-
 	@Test
 	public void testSnapshotAndRestoreWrappedCheckpointedFunction() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 1d83229..f9a1cd0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -72,6 +73,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import javax.annotation.Nonnull;
+
 import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Comparator;
@@ -91,7 +93,14 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link AsyncWaitOperator}. These test that:
@@ -361,7 +370,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	}
 
 	/**
-	 *	Tests that the AsyncWaitOperator works together with chaining
+	 *	Tests that the AsyncWaitOperator works together with chaining.
 	 */
 	@Test
 	public void testOperatorChainWithProcessingTime() throws Exception {
@@ -601,7 +610,6 @@ public class AsyncWaitOperatorTest extends TestLogger {
 				super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize);
 		}
 
-
 		@Override
 		public void acknowledgeCheckpoint(
 				long checkpointId,
@@ -702,7 +710,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	 * emitter is currently waiting on the checkpoint lock (e.g. in the case of two chained async
 	 * wait operators where the latter operator's queue is currently full).
 	 *
-	 * Note that this test does not enforce the exact strict ordering because with the fix it is no
+	 * <p>Note that this test does not enforce the exact strict ordering because with the fix it is no
 	 * longer possible. However, it provokes the described situation without the fix.
 	 */
 	@Test(timeout = 10000L)
@@ -880,7 +888,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	/**
 	 * FLINK-6435
 	 *
-	 * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
+	 * <p>Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
 	 * another StreamElementQueueEntry is properly completed before it is collected.
 	 */
 	@Test(timeout = 2000)
@@ -891,7 +899,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	/**
 	 * FLINK-6435
 	 *
-	 * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
+	 * <p>Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
 	 * another StreamElementQueueEntry is properly completed before it is collected.
 	 */
 	@Test(timeout = 2000)
@@ -945,7 +953,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	/**
 	 * FLINK-6435
 	 *
-	 * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
+	 * <p>Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
 	 * a StreamElementQueueEntry is completed in case of a timeout exception.
 	 */
 	@Test
@@ -956,7 +964,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	/**
 	 * FLINK-6435
 	 *
-	 * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
+	 * <p>Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
 	 * a StreamElementQueueEntry is completed in case of a timeout exception.
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
index c3a47aa..da2d795 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.CollectorOutput;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -46,6 +47,9 @@ import java.util.concurrent.TimeUnit;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for {@link Emitter}.
+ */
 public class EmitterTest extends TestLogger {
 
 	private static final long timeout = 10000L;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index 0380512..5832b89 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,7 +45,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 /**
- * {@link OrderedStreamElementQueue} specific tests
+ * {@link OrderedStreamElementQueue} specific tests.
  */
 public class OrderedStreamElementQueueTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index c9e59c7..fe9db95 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index 0a57f92..ba6ce42 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -45,7 +46,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 /**
- * {@link UnorderedStreamElementQueue} specific tests
+ * {@link UnorderedStreamElementQueue} specific tests.
  */
 public class UnorderedStreamElementQueueTest extends TestLogger {
 	private static final long timeout = 10000L;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index c19eb37..beb5bf5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -15,23 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.api.operators.co;
 
+package org.apache.flink.streaming.api.operators.co;
 
-import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import static org.junit.Assert.assertEquals;
-
 /**
  * Tests {@link CoProcessOperator}.
  */
@@ -97,7 +95,6 @@ public class CoProcessOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
-
 	private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
index 4dbf7b8..6f7d097 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.api.operators.co;
 
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -30,9 +25,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * Tests for {@link CoStreamFlatMap}. These test that:
  *
@@ -45,7 +44,7 @@ import org.junit.Test;
 public class CoStreamFlatMapTest implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
+	private static final class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
index 28ae664..6826c96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -42,7 +43,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public class CoStreamMapTest implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
+	private static final class MyCoMap implements CoMapFunction<Double, Integer, String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -56,7 +57,6 @@ public class CoStreamMapTest implements Serializable {
 		}
 	}
 
-
 	@Test
 	public void testCoMap() throws Exception {
 		CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new MyCoMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
index d8c9a61..3f590ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.api.operators.co;
 
+package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -324,7 +325,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
-
 	private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
 		private static final long serialVersionUID = 1L;
 
@@ -413,7 +413,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
 			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
 		}
 
-
 		@Override
 		public void onTimer(
 				long timestamp,
@@ -494,7 +493,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
 			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
 		}
 
-
 		@Override
 		public void onTimer(
 				long timestamp,
@@ -519,7 +517,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
 			ctx.timerService().registerProcessingTimeTimer(5);
 		}
 
-
 		@Override
 		public void onTimer(
 				long timestamp,

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 4b0f5ab..7657ce7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
@@ -42,7 +41,9 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.util.Collector;
+
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -66,6 +67,9 @@ import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for {@link InternalWindowFunction}.
+ */
 public class InternalWindowFunctionTest {
 
 	@SuppressWarnings("unchecked")
@@ -98,12 +102,12 @@ public class InternalWindowFunctionTest {
 
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
-		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Iterable<Long> i = (Iterable<Long>) mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.process(((byte)0), w, ctx, i, c);
+		windowFunction.process(((byte) 0), w, ctx, i, c);
 		verify(mock).apply(w, i, c);
 
 		// check close
@@ -141,11 +145,11 @@ public class InternalWindowFunctionTest {
 
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
-		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Iterable<Long> i = (Iterable<Long>) mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
-		windowFunction.process(((byte)0), w, ctx, i, c);
+		windowFunction.process(((byte) 0), w, ctx, i, c);
 		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
 		// check close
@@ -183,7 +187,7 @@ public class InternalWindowFunctionTest {
 
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
-		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Iterable<Long> i = (Iterable<Long>) mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
@@ -225,7 +229,7 @@ public class InternalWindowFunctionTest {
 
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
-		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Iterable<Long> i = (Iterable<Long>) mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
@@ -288,7 +292,7 @@ public class InternalWindowFunctionTest {
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
 		windowFunction.process(42L, w, ctx, 23L, c);
-		verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+		verify(mock).apply(eq(42L), eq(w), (Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
 		windowFunction.close();
@@ -329,8 +333,8 @@ public class InternalWindowFunctionTest {
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.process(((byte)0), w, ctx, 23L, c);
-		verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+		windowFunction.process(((byte) 0), w, ctx, 23L, c);
+		verify(mock).apply(eq(w), (Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
 		windowFunction.close();
@@ -371,8 +375,8 @@ public class InternalWindowFunctionTest {
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.process(((byte)0), w, ctx, 23L, c);
-		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+		windowFunction.process(((byte) 0), w, ctx, 23L, c);
+		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
 		windowFunction.close();
@@ -424,9 +428,9 @@ public class InternalWindowFunctionTest {
 				return null;
 			}
 		}).when(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(),
-		(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+		(Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
-		windowFunction.process(42L, w, ctx,23L, c);
+		windowFunction.process(42L, w, ctx, 23L, c);
 		verify(ctx).currentProcessingTime();
 		verify(ctx).currentWatermark();
 		verify(ctx).windowState();
@@ -597,7 +601,7 @@ public class InternalWindowFunctionTest {
 		args.add(24L);
 		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.process(((byte)0), w, ctx, args, c);
+		windowFunction.process(((byte) 0), w, ctx, args, c);
 		verify(mock).process(
 				(AggregateProcessAllWindowFunctionMock.Context) anyObject(),
 				(Iterable) argThat(containsInAnyOrder(allOf(
@@ -610,7 +614,7 @@ public class InternalWindowFunctionTest {
 		verify(mock).close();
 	}
 
-	public static class ProcessWindowFunctionMock
+	private static class ProcessWindowFunctionMock
 		extends ProcessWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
 
@@ -624,7 +628,7 @@ public class InternalWindowFunctionTest {
 		}
 	}
 
-	public static class AggregateProcessWindowFunctionMock
+	private static class AggregateProcessWindowFunctionMock
 			extends ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
 			implements OutputTypeConfigurable<String> {
 
@@ -638,7 +642,7 @@ public class InternalWindowFunctionTest {
 		}
 	}
 
-	public static class AggregateProcessAllWindowFunctionMock
+	private static class AggregateProcessAllWindowFunctionMock
 			extends ProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
 			implements OutputTypeConfigurable<String> {
 
@@ -651,7 +655,7 @@ public class InternalWindowFunctionTest {
 		public void process(Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
 	}
 
-	public static class WindowFunctionMock
+	private static class WindowFunctionMock
 		extends RichWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
 
@@ -664,7 +668,7 @@ public class InternalWindowFunctionTest {
 		public void apply(Long aLong, TimeWindow w, Iterable<Long> input, Collector<String> out) throws Exception { }
 	}
 
-	public static class AllWindowFunctionMock
+	private static class AllWindowFunctionMock
 		extends RichAllWindowFunction<Long, String, TimeWindow>
 		implements OutputTypeConfigurable<String> {
 
@@ -677,7 +681,7 @@ public class InternalWindowFunctionTest {
 		public void apply(TimeWindow window, Iterable<Long> values, Collector<String> out) throws Exception { }
 	}
 
-	public static class ProcessAllWindowFunctionMock
+	private static class ProcessAllWindowFunctionMock
 		extends ProcessAllWindowFunction<Long, String, TimeWindow>
 		implements OutputTypeConfigurable<String> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
index 6ee7d38..781a216 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
@@ -17,14 +17,17 @@
 
 package org.apache.flink.streaming.api.streamtask;
 
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.operators.DataSourceTask;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
 
+/**
+ * Mock {@link RecordWriter}.
+ */
 public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
 
 	public ArrayList<Integer> emittedRecords;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
index dafdeed..4eda5ad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -24,8 +24,11 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
 
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link StreamIterationHead}.
+ */
 public class StreamIterationHeadTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
index b640d6f..bedab97 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
@@ -17,11 +17,15 @@
 
 package org.apache.flink.streaming.api.windowing.deltafunction;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.streaming.api.functions.windowing.delta.CosineDistance;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CosineDistance}.
+ */
 public class CosineDistanceTest {
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -29,18 +33,18 @@ public class CosineDistanceTest {
 	public void testCosineDistance() {
 
 		//Reference calculated using wolfram alpha
-		double[][][] testdata={
-				{{0,0,0},{0,0,0}},
-				{{0,0,0},{1,2,3}},
-				{{1,2,3},{0,0,0}},
-				{{1,2,3},{4,5,6}},
-				{{1,2,3},{-4,-5,-6}},
-				{{1,2,-3},{-4,5,-6}},
-				{{1,2,3,4},{5,6,7,8}},
-				{{1,2},{3,4}},
-				{{1},{2}},
+		double[][][] testdata = {
+				{{0, 0, 0}, {0, 0, 0}},
+				{{0, 0, 0}, {1, 2, 3}},
+				{{1, 2, 3}, {0, 0, 0}},
+				{{1, 2, 3}, {4, 5, 6}},
+				{{1, 2, 3}, {-4, -5, -6}},
+				{{1, 2, -3}, {-4, 5, -6}},
+				{{1, 2, 3, 4}, {5, 6, 7, 8}},
+				{{1, 2}, {3, 4}},
+				{{1}, {2}},
 			};
-		double[] referenceSolutions={
+		double[] referenceSolutions = {
 				0,
 				0,
 				0,
@@ -60,12 +64,15 @@ public class CosineDistanceTest {
 	}
 
 	private String arrayToString(double[] in){
-		if (in.length==0) return "{}";
-		String result="{";
-		for (double d:in){
-			result+=d+",";
+		if (in.length == 0) {
+			return "{}";
+		}
+
+		String result = "{";
+		for (double d:in) {
+			result += d + ",";
 		}
-		return result.substring(0, result.length()-1)+"}";
+		return result.substring(0, result.length() - 1) + "}";
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
index 1ba5f84..c534bf8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
@@ -17,11 +17,15 @@
 
 package org.apache.flink.streaming.api.windowing.deltafunction;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.streaming.api.functions.windowing.delta.EuclideanDistance;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link EuclideanDistance}.
+ */
 public class EuclideanDistanceTest {
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -29,18 +33,18 @@ public class EuclideanDistanceTest {
 	public void testEuclideanDistance() {
 
 		//Reference calculated using wolfram alpha
-		double[][][] testdata={
-				{{0,0,0},{0,0,0}},
-				{{0,0,0},{1,2,3}},
-				{{1,2,3},{0,0,0}},
-				{{1,2,3},{4,5,6}},
-				{{1,2,3},{-4,-5,-6}},
-				{{1,2,-3},{-4,5,-6}},
-				{{1,2,3,4},{5,6,7,8}},
-				{{1,2},{3,4}},
-				{{1},{2}},
+		double[][][] testdata = {
+				{{0, 0, 0}, {0, 0, 0}},
+				{{0, 0, 0}, {1, 2, 3}},
+				{{1, 2, 3}, {0, 0, 0}},
+				{{1, 2, 3}, {4, 5, 6}},
+				{{1, 2, 3}, {-4, -5, -6}},
+				{{1, 2, -3}, {-4, 5, -6}},
+				{{1, 2, 3, 4}, {5, 6, 7, 8}},
+				{{1, 2}, {3, 4}},
+				{{1}, {2}},
 			};
-		double[] referenceSolutions={
+		double[] referenceSolutions = {
 				0,
 				3.741657,
 				3.741657,
@@ -61,12 +65,15 @@ public class EuclideanDistanceTest {
 	}
 
 	private String arrayToString(double[] in){
-		if (in.length==0) return "{}";
-		String result="{";
+		if (in.length == 0) {
+			return "{}";
+		}
+
+		String result = "{";
 		for (double d:in){
-			result+=d+",";
+			result += d + ",";
 		}
-		return result.substring(0, result.length()-1)+"}";
+		return result.substring(0, result.length() - 1) + "}";
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index a897674..58898d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -166,7 +167,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 	 * B (unchained): [ (src0) ] -> [ (map) -> (filter) -> (sink) ]
 	 * </pre>
 	 *
-	 * The hashes for the single vertex in A and the source vertex in B need to be different.
+	 * <p>The hashes for the single vertex in A and the source vertex in B need to be different.
 	 */
 	@Test
 	public void testNodeHashAfterSourceUnchaining() throws Exception {
@@ -208,7 +209,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 	 * B (unchained): [ (src0) ] -> [ (map) -> (filter) -> (sink) ]
 	 * </pre>
 	 *
-	 * The hashes for the single vertex in A and the source vertex in B need to be different.
+	 * <p>The hashes for the single vertex in A and the source vertex in B need to be different.
 	 */
 	@Test
 	public void testNodeHashAfterIntermediateUnchaining() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
index 4175f18..e02ccdd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
@@ -26,8 +26,12 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Test translation of {@link CheckpointingMode}.
+ */
 @SuppressWarnings("serial")
 public class TranslationTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
index 8065cf1..5d606ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
@@ -28,17 +28,20 @@ import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
 import static java.util.Arrays.asList;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests that when sources implement {@link WithMasterCheckpointHook} the hooks are properly

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 4322748..b5ea866 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -48,7 +47,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.eq;
@@ -57,7 +55,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the barrier buffer's maximum limit of buffered/spilled bytes
+ * Tests for the barrier buffer's maximum limit of buffered/spilled bytes.
  */
 public class BarrierBufferAlignmentLimitTest {
 
@@ -65,7 +63,7 @@ public class BarrierBufferAlignmentLimitTest {
 
 	private static final Random RND = new Random();
 
-	private static IOManager IO_MANAGER;
+	private static IOManager ioManager;
 
 	// ------------------------------------------------------------------------
 	//  Setup
@@ -73,12 +71,12 @@ public class BarrierBufferAlignmentLimitTest {
 
 	@BeforeClass
 	public static void setup() {
-		IO_MANAGER = new IOManagerAsync();
+		ioManager = new IOManagerAsync();
 	}
 
 	@AfterClass
 	public static void shutdownIOManager() {
-		IO_MANAGER.shutdown();
+		ioManager.shutdown();
 	}
 
 	// ------------------------------------------------------------------------
@@ -86,7 +84,7 @@ public class BarrierBufferAlignmentLimitTest {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * This tests that a single alignment that buffers too much data cancels
+	 * This tests that a single alignment that buffers too much data cancels.
 	 */
 	@Test
 	public void testBreakCheckpointAtAlignmentLimit() throws Exception {
@@ -116,7 +114,7 @@ public class BarrierBufferAlignmentLimitTest {
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 1000);
 
 		StatefulTask toNotify = mock(StatefulTask.class);
 		buffer.registerCheckpointEventHandler(toNotify);
@@ -173,7 +171,7 @@ public class BarrierBufferAlignmentLimitTest {
 	 *   - an alignment starts
 	 *   - barriers from a second checkpoint queue before the first completes
 	 *   - together they are larger than the threshold
-	 *   - after the first checkpoint (with second checkpoint data queued) aborts, the second completes
+	 *   - after the first checkpoint (with second checkpoint data queued) aborts, the second completes.
 	 */
 	@Test
 	public void testAlignmentLimitWithQueuedAlignments() throws Exception {
@@ -210,7 +208,7 @@ public class BarrierBufferAlignmentLimitTest {
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 500);
 
 		StatefulTask toNotify = mock(StatefulTask.class);
 		buffer.registerCheckpointEventHandler(toNotify);
@@ -315,7 +313,7 @@ public class BarrierBufferAlignmentLimitTest {
 
 	private static void checkNoTempFilesRemain() {
 		// validate that all temp files have been removed
-		for (File dir : IO_MANAGER.getSpillingDirectories()) {
+		for (File dir : ioManager.getSpillingDirectories()) {
 			for (String file : dir.list()) {
 				if (file != null && !(file.equals(".") || file.equals(".."))) {
 					fail("barrier buffer did not clean up temp files. remaining file: " + file);
@@ -325,7 +323,7 @@ public class BarrierBufferAlignmentLimitTest {
 	}
 
 	/**
-	 * A validation matcher for checkpoint metadata against checkpoint IDs
+	 * A validation matcher for checkpoint metadata against checkpoint IDs.
 	 */
 	private static class CheckpointMatcher extends BaseMatcher<CheckpointMetaData> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 3514f56..49d07b1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -82,11 +83,11 @@ public class BarrierBufferMassiveRandomTest {
 	//  Mocks and Generators
 	// ------------------------------------------------------------------------
 
-	protected interface BarrierGenerator {
-		public boolean isNextBarrier();
+	private interface BarrierGenerator {
+		boolean isNextBarrier();
 	}
 
-	protected static class RandomBarrier implements BarrierGenerator {
+	private static class RandomBarrier implements BarrierGenerator {
 
 		private static final Random rnd = new Random();
 
@@ -117,7 +118,7 @@ public class BarrierBufferMassiveRandomTest {
 		}
 	}
 
-	protected static class RandomGeneratingInputGate implements InputGate {
+	private static class RandomGeneratingInputGate implements InputGate {
 
 		private final int numChannels;
 		private final BufferPool[] bufferPools;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index fedf4fc..c2cf7f3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.TaskStateHandles;
+
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.junit.AfterClass;
@@ -67,19 +68,19 @@ public class BarrierBufferTest {
 
 	private static final int PAGE_SIZE = 512;
 
-	private static int SIZE_COUNTER = 0;
+	private static int sizeCounter = 0;
 
-	private static IOManager IO_MANAGER;
+	private static IOManager ioManager;
 
 	@BeforeClass
 	public static void setup() {
-		IO_MANAGER = new IOManagerAsync();
-		SIZE_COUNTER = 1;
+		ioManager = new IOManagerAsync();
+		sizeCounter = 1;
 	}
 
 	@AfterClass
 	public static void shutdownIOManager() {
-		IO_MANAGER.shutdown();
+		ioManager.shutdown();
 	}
 
 	// ------------------------------------------------------------------------
@@ -99,7 +100,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			for (BufferOrEvent boe : sequence) {
 				assertEquals(boe, buffer.getNextNonBlocked());
@@ -134,7 +135,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			for (BufferOrEvent boe : sequence) {
 				assertEquals(boe, buffer.getNextNonBlocked());
@@ -173,7 +174,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
@@ -239,7 +240,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
@@ -348,7 +349,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
@@ -439,7 +440,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
@@ -547,7 +548,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			StatefulTask toNotify = mock(StatefulTask.class);
 			buffer.registerCheckpointEventHandler(toNotify);
@@ -647,7 +648,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
@@ -752,7 +753,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			// checkpoint 1
 			check(sequence[0], buffer.getNextNonBlocked());
@@ -823,7 +824,7 @@ public class BarrierBufferTest {
 			};
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
@@ -892,7 +893,7 @@ public class BarrierBufferTest {
 
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
 
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 			// pre checkpoint 2
 			check(sequence[0], buffer.getNextNonBlocked());
@@ -958,7 +959,7 @@ public class BarrierBufferTest {
 		};
 
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 		// data after first checkpoint
 		check(sequence[3], buffer.getNextNonBlocked());
@@ -1002,7 +1003,7 @@ public class BarrierBufferTest {
 		};
 
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 		StatefulTask toNotify = mock(StatefulTask.class);
 		buffer.registerCheckpointEventHandler(toNotify);
@@ -1066,7 +1067,7 @@ public class BarrierBufferTest {
 		};
 
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 		StatefulTask toNotify = mock(StatefulTask.class);
 		buffer.registerCheckpointEventHandler(toNotify);
@@ -1160,7 +1161,7 @@ public class BarrierBufferTest {
 		};
 
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 		StatefulTask toNotify = mock(StatefulTask.class);
 		buffer.registerCheckpointEventHandler(toNotify);
@@ -1214,7 +1215,7 @@ public class BarrierBufferTest {
 	 * This tests the where a replay of queued checkpoint barriers meets
 	 * a canceled checkpoint.
 	 *
-	 * The replayed newer checkpoint barrier must not try to cancel the
+	 * <p>The replayed newer checkpoint barrier must not try to cancel the
 	 * already canceled checkpoint.
 	 */
 	@Test
@@ -1251,7 +1252,7 @@ public class BarrierBufferTest {
 		};
 
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 		StatefulTask toNotify = mock(StatefulTask.class);
 		buffer.registerCheckpointEventHandler(toNotify);
@@ -1336,7 +1337,7 @@ public class BarrierBufferTest {
 		};
 
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+		BarrierBuffer buffer = new BarrierBuffer(gate, ioManager);
 
 		StatefulTask toNotify = mock(StatefulTask.class);
 		buffer.registerCheckpointEventHandler(toNotify);
@@ -1398,7 +1399,7 @@ public class BarrierBufferTest {
 	}
 
 	private static BufferOrEvent createBuffer(int channel) {
-		final int size = SIZE_COUNTER++;
+		final int size = sizeCounter++;
 		byte[] bytes = new byte[size];
 		RND.nextBytes(bytes);
 
@@ -1436,7 +1437,7 @@ public class BarrierBufferTest {
 
 	private static void checkNoTempFilesRemain() {
 		// validate that all temp files have been removed
-		for (File dir : IO_MANAGER.getSpillingDirectories()) {
+		for (File dir : ioManager.getSpillingDirectories()) {
 			for (String file : dir.list()) {
 				if (file != null && !(file.equals(".") || file.equals(".."))) {
 					fail("barrier buffer did not clean up temp files. remaining file: " + file);

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 8c66205..847db5c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -263,7 +263,7 @@ public class BarrierTrackerTest {
 	 * discard a pending checkpoint as soon as it sees a barrier from a
 	 * later checkpoint from some channel.
 	 *
-	 * This behavior is crucial, otherwise topologies where different inputs
+	 * <p>This behavior is crucial, otherwise topologies where different inputs
 	 * have different latency (and that latency is close to or higher than the
 	 * checkpoint interval) may skip many checkpoints, or fail to complete a
 	 * checkpoint all together.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index 905bc59..4edb665 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -31,7 +31,6 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,16 +39,23 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Random;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-import static org.junit.Assert.*;
-
+/**
+ * Tests for {@link BufferSpiller}.
+ */
 public class BufferSpillerTest {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BufferSpillerTest.class);
 
 	private static final int PAGE_SIZE = 4096;
 
-	private static IOManager IO_MANAGER;
+	private static IOManager ioManager;
 
 	private BufferSpiller spiller;
 
@@ -60,18 +66,18 @@ public class BufferSpillerTest {
 
 	@BeforeClass
 	public static void setupIOManager() {
-		IO_MANAGER = new IOManagerAsync();
+		ioManager = new IOManagerAsync();
 	}
 
 	@AfterClass
 	public static void shutdownIOManager() {
-		IO_MANAGER.shutdown();
+		ioManager.shutdown();
 	}
 
 	@Before
 	public void createSpiller() {
 		try {
-			spiller = new BufferSpiller(IO_MANAGER, PAGE_SIZE);
+			spiller = new BufferSpiller(ioManager, PAGE_SIZE);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -205,7 +211,7 @@ public class BufferSpillerTest {
 			int currentNumRecordAndEvents = 0;
 
 			// do multiple spilling / rolling over rounds
-			for (int round = 0; round < 2*sequences; round++) {
+			for (int round = 0; round < 2 * sequences; round++) {
 
 				if (round % 2 == 1) {
 					// make this an empty sequence
@@ -392,7 +398,7 @@ public class BufferSpillerTest {
 
 	private static void checkNoTempFilesRemain() {
 		// validate that all temp files have been removed
-		for (File dir : IO_MANAGER.getSpillingDirectories()) {
+		for (File dir : ioManager.getSpillingDirectories()) {
 			for (String file : dir.list()) {
 				if (file != null && !(file.equals(".") || file.equals(".."))) {
 					fail("barrier buffer did not clean up temp files. remaining file: " + file);

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 3e2a75a..77c938a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -28,6 +28,9 @@ import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Queue;
 
+/**
+ * Mock {@link InputGate}.
+ */
 public class MockInputGate implements InputGate {
 
 	private final int pageSize;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
index a6e834c..adbe240 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
@@ -37,7 +37,11 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Random;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests that validate the behavior of the {@link SpilledBufferOrEventSequence} in isolation,
@@ -46,12 +50,11 @@ import static org.junit.Assert.*;
 public class SpilledBufferOrEventSequenceTest {
 
 	private final ByteBuffer buffer = ByteBuffer.allocateDirect(128 * 1024).order(ByteOrder.LITTLE_ENDIAN);
-	private final int pageSize = 32*1024;
+	private final int pageSize = 32 * 1024;
 
 	private File tempFile;
 	private FileChannel fileChannel;
 
-
 	@Before
 	public void initTempChannel() {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index 54cd186..d114139 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -29,14 +29,16 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.types.LongValue;
 
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * This test uses the PowerMockRunner runner to work around the fact that the
@@ -99,7 +101,6 @@ public class StreamRecordWriterTest {
 		when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
 		when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
 
-
 		return mockWriter;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
index 466ca65..2d0855a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -41,6 +42,9 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 
+/**
+ * Test processing files during rescaling.
+ */
 public class ContinuousFileProcessingRescalingTest {
 
 	@Test
@@ -83,7 +87,6 @@ public class ContinuousFileProcessingRescalingTest {
 		testHarness1.getOutput().clear();
 		testHarness2.getOutput().clear();
 
-
 		// 2) and take the snapshots from the previous instances and merge them
 		// into a new one which will be then used to initialize a third instance
 		OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 9bcd2e6..5085eb4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -33,6 +33,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+/**
+ * Tests for {@link GenericWriteAheadSink}.
+ */
 public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> {
 
 	@Override
@@ -50,7 +53,6 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 		return new Tuple1<>(counter);
 	}
 
-
 	@Override
 	protected void verifyResultsIdealCircumstances(ListSink sink) {
 
@@ -182,7 +184,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 		}
 	}
 
-	public static class SimpleCommitter extends CheckpointCommitter {
+	private static class SimpleCommitter extends CheckpointCommitter {
 		private static final long serialVersionUID = 1L;
 
 		private List<Tuple2<Long, Integer>> checkpoints;
@@ -232,7 +234,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 		}
 	}
 
-	public static class FailingCommitter extends CheckpointCommitter {
+	private static class FailingCommitter extends CheckpointCommitter {
 		private static final long serialVersionUID = 1L;
 
 		private List<Tuple2<Long, Integer>> checkpoints;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index c95a85e..8d99acd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -36,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -80,7 +82,7 @@ public class StreamOperatorChainingTest {
 	private void testMultiChaining(StreamExecutionEnvironment env) throws Exception {
 
 		// the actual elements will not be used
-		DataStream<Integer> input = env.fromElements(1,2,3);
+		DataStream<Integer> input = env.fromElements(1, 2, 3);
 
 		sink1Results = new ArrayList<>();
 		sink2Results = new ArrayList<>();
@@ -188,7 +190,7 @@ public class StreamOperatorChainingTest {
 	private void testMultiChainingWithSplit(StreamExecutionEnvironment env) throws Exception {
 
 		// the actual elements will not be used
-		DataStream<Integer> input = env.fromElements(1,2,3);
+		DataStream<Integer> input = env.fromElements(1, 2, 3);
 
 		sink1Results = new ArrayList<>();
 		sink2Results = new ArrayList<>();