You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/22 21:50:23 UTC

[04/10] flink git commit: [FLINK-6603] [streaming] Enable checkstyle on test sources

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 2a6a723..d9fcc12 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -37,11 +37,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.CollectorOutput;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -52,12 +52,15 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link StreamSource} operators.
+ */
 @SuppressWarnings("serial")
 public class StreamSourceOperatorTest {
 
@@ -86,7 +89,6 @@ public class StreamSourceOperatorTest {
 		final StreamSource<String, InfiniteSource<String>> operator =
 				new StreamSource<>(new InfiniteSource<String>());
 
-
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 		operator.cancel();
 
@@ -106,7 +108,6 @@ public class StreamSourceOperatorTest {
 		final StreamSource<String, InfiniteSource<String>> operator =
 				new StreamSource<>(new InfiniteSource<String>());
 
-
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 
 		// trigger an async cancel in a bit
@@ -139,7 +140,6 @@ public class StreamSourceOperatorTest {
 		final StoppableStreamSource<String, InfiniteSource<String>> operator =
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
-
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 		operator.stop();
 
@@ -158,7 +158,6 @@ public class StreamSourceOperatorTest {
 		final StoppableStreamSource<String, InfiniteSource<String>> operator =
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
-
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 
 		// trigger an async cancel in a bit
@@ -179,7 +178,7 @@ public class StreamSourceOperatorTest {
 	}
 
 	/**
-	 * Test that latency marks are emitted
+	 * Test that latency marks are emitted.
 	 */
 	@Test
 	public void testLatencyMarkEmission() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 6772db4..6e3be03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -24,15 +24,16 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.junit.Test;
 
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
@@ -65,7 +66,6 @@ public class StreamTaskTimerTest {
 
 		assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
 
-
 		testHarness.endInput();
 		testHarness.waitForTaskCompletion();
 
@@ -109,8 +109,7 @@ public class StreamTaskTimerTest {
 			long deadline = System.currentTimeMillis() + 20000;
 			while (errorRef.get() == null &&
 					ValidatingProcessingTimeCallback.numInSequence < 4 &&
-					System.currentTimeMillis() < deadline)
-			{
+					System.currentTimeMillis() < deadline) {
 				Thread.sleep(100);
 			}
 
@@ -170,6 +169,9 @@ public class StreamTaskTimerTest {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Identity mapper.
+	 */
 	public static class DummyMapFunction<T> implements MapFunction<T, T> {
 		@Override
 		public T map(T value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 9897884..675ffa3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 import org.junit.Test;
 
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link TestProcessingTimeService}.
+ */
 public class TestProcessingTimeServiceTest {
 
 	@Test
@@ -90,6 +93,9 @@ public class TestProcessingTimeServiceTest {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * An {@link AsyncExceptionHandler} storing the handled exception.
+	 */
 	public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
 
 		private final AtomicReference<Throwable> errorReference;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index f129c20..51af116 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -28,8 +28,13 @@ import org.junit.Test;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link TimestampsAndPeriodicWatermarksOperator}.
+ */
 public class TimestampsAndPeriodicWatermarksOperatorTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
index 0333e93..a422432 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
@@ -30,6 +30,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link TimestampsAndPunctuatedWatermarksOperator}.
+ */
 public class TimestampsAndPunctuatedWatermarksOperatorTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index 46d92af..d3fd585 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -27,6 +27,9 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+/**
+ * Test base for {@link GenericWriteAheadSink}.
+ */
 public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger {
 
 	protected abstract S createSink() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 2f7e302..a57dcf1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -68,6 +68,9 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link AccumulatingProcessingTimeWindowOperator}.
+ */
 @SuppressWarnings({"serial"})
 @PrepareForTest(InternalIterableWindowFunction.class)
 @RunWith(PowerMockRunner.class)
@@ -220,7 +223,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			testHarness.close();
 
-
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
 
@@ -278,7 +280,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.setProcessingTime(currentTime);
 			}
 
-
 			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 			assertEquals(numElements, result.size());
 
@@ -322,7 +323,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.setProcessingTime(currentTime);
 			}
 
-
 			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 			assertEquals(numElements, result.size());
 
@@ -476,7 +476,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			testHarness.setProcessingTime(200);
 
-
 			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 			assertEquals(6, result.size());
 
@@ -524,7 +523,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			testHarness.setProcessingTime(200);
 
-
 			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 			assertEquals(6, result.size());
 
@@ -837,7 +835,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			testHarness.restore(state);
 			testHarness.open();
 
-
 			// inject again the remaining elements
 			for (int i = numElementsFirst; i < numElements; i++) {
 				testHarness.processElement(new StreamRecord<>(i));
@@ -929,7 +926,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			testHarness.restore(state);
 			testHarness.open();
 
-
 			// inject again the remaining elements
 			for (int i = numElementsFirst; i < numElements; i++) {
 				testHarness.processElement(new StreamRecord<>(i));
@@ -1040,7 +1036,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		// we use a concurrent map here even though there is no concurrency, to
 		// get "volatile" style access to entries
-		static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+		private static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
 
 		private ValueState<Integer> state;
 
@@ -1053,9 +1049,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		@Override
 		public void process(Integer key,
-						  Context context,
-						  Iterable<Integer> values,
-						  Collector<Integer> out) throws Exception {
+						Context context,
+						Iterable<Integer> values,
+						Collector<Integer> out) throws Exception {
 			for (Integer i : values) {
 				// we need to update this state before emitting elements. Else, the test's main
 				// thread will have received all output elements before the state is updated and
@@ -1093,8 +1089,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	private static StreamTask<?, ?> createMockTaskWithTimer(
-		final ProcessingTimeService timerService)
-	{
+		final ProcessingTimeService timerService) {
 		StreamTask<?, ?> mockTask = createMockTask();
 		when(mockTask.getProcessingTimeService()).thenReturn(timerService);
 		return mockTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 1875bbb..62f4f0b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -35,9 +35,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -56,6 +56,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for aligned {@link AggregatingProcessingTimeWindowOperator}.
+ */
 @SuppressWarnings("serial")
 public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
@@ -66,9 +69,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
 
 	private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector =
-			new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+			new KeySelector<Tuple2<Integer, Integer>, Integer>() {
 				@Override
-				public Integer getKey(Tuple2<Integer,Integer> value) {
+				public Integer getKey(Tuple2<Integer, Integer> value) {
 					return value.f0;
 				}
 	};
@@ -585,7 +588,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSlide);
 
-
 			OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
 					new OneInputStreamOperatorTestHarness<>(op);
 
@@ -824,7 +826,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	private static class StatefulFunction extends RichReduceFunction<Tuple2<Integer, Integer>> {
 
-		static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+		private static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
 
 		private ValueState<Integer> state;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 81a9275..a7c6f47 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -1244,7 +1245,6 @@ public class AllWindowTranslationTest {
 
 	}
 
-
 	@Test
 	@SuppressWarnings("rawtypes")
 	public void testReduceWithCustomTrigger() throws Exception {
@@ -1461,7 +1461,7 @@ public class AllWindowTranslationTest {
 	//  UDFs
 	// ------------------------------------------------------------------------
 
-	public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
+	private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
index 0f65a88..9c14a9f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import java.util.Collection;
@@ -62,7 +64,7 @@ public class ContinuousEventTimeTriggerTest {
 
 
 	/**
-	 * Verify that state <TimeWindow>of separate windows does not leak into other windows.
+	 * Verify that state &lt;TimeWindow&gt;of separate windows does not leak into other windows.
 	 */
 	@Test
 	public void testWindowSeparationAndFiring() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
index 16e353b..38dd01d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
index a46572b..23af838 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
@@ -15,10 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
@@ -28,6 +27,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -38,27 +39,35 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
- * Tests for {@link EventTimeSessionWindows}
+ * Tests for {@link EventTimeSessionWindows}.
  */
 public class EventTimeSessionWindowsTest extends TestLogger {
 
 	@Test
 	public void testWindowAssignment() {
-		final int SESSION_GAP = 5000;
+		final int sessionGap = 5000;
 
 		WindowAssigner.WindowAssignerContext mockContext =
 				mock(WindowAssigner.WindowAssignerContext.class);
 
-		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(SESSION_GAP));
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(sessionGap));
 
-		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP)));
-		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
-		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + sessionGap)));
+		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + sessionGap)));
+		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + sessionGap)));
 	}
 
 	@Test
@@ -138,16 +147,16 @@ public class EventTimeSessionWindowsTest extends TestLogger {
 	public void testTimeUnits() {
 		// sanity check with one other time unit
 
-		final int SESSION_GAP = 5000;
+		final int sessionGap = 5000;
 
 		WindowAssigner.WindowAssignerContext mockContext =
 				mock(WindowAssigner.WindowAssignerContext.class);
 
-		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(SESSION_GAP / 1000));
+		EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(sessionGap / 1000));
 
-		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP)));
-		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
-		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + sessionGap)));
+		assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + sessionGap)));
+		assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + sessionGap)));
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
index 2d93ac0..2bcc192 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
index 7af4506..a89aec0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index e5d3ef0..8d65bb4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -58,18 +60,21 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * Tests for {@link EvictingWindowOperator}.
+ */
 public class EvictingWindowOperatorTest {
 
 	/**
-	 * Tests CountEvictor evictAfter behavior
+	 * Tests CountEvictor evictAfter behavior.
 	 * @throws Exception
      */
 	@Test
 	public void testCountEvictorEvictAfter() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
-		final int WINDOW_SIZE = 4;
-		final int TRIGGER_COUNT = 2;
-		final boolean EVICT_AFTER = true;
+		final int windowSize = 4;
+		final int triggerCount = 2;
+		final boolean evictAfter = true;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -80,7 +85,6 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 			GlobalWindows.create(),
 			new GlobalWindow.Serializer(),
@@ -88,16 +92,14 @@ public class EvictingWindowOperatorTest {
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
-			CountTrigger.of(TRIGGER_COUNT),
-			CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
+			CountTrigger.of(triggerCount),
+			CountEvictor.of(windowSize, evictAfter),
 			0,
 			null /* late data output tag */);
 
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
-
 		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -114,8 +116,6 @@ public class EvictingWindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
 
-
-
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
@@ -139,18 +139,17 @@ public class EvictingWindowOperatorTest {
 		testHarness.close();
 
 		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
 	}
 
 	/**
-	 * Tests TimeEvictor evictAfter behavior
+	 * Tests TimeEvictor evictAfter behavior.
 	 * @throws Exception
 	 */
 	@Test
 	public void testTimeEvictorEvictAfter() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
-		final int TRIGGER_COUNT = 2;
-		final boolean EVICT_AFTER = true;
+		final int triggerCount = 2;
+		final boolean evictAfter = true;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -161,7 +160,6 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 			GlobalWindows.create(),
 			new GlobalWindow.Serializer(),
@@ -169,12 +167,11 @@ public class EvictingWindowOperatorTest {
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
-			CountTrigger.of(TRIGGER_COUNT),
-			TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+			CountTrigger.of(triggerCount),
+			TimeEvictor.of(Time.seconds(2), evictAfter),
 			0,
 			null /* late data output tag */);
 
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -194,13 +191,10 @@ public class EvictingWindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
 
-
-
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE));
 
-
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
@@ -218,14 +212,14 @@ public class EvictingWindowOperatorTest {
 	}
 
 	/**
-	 * Tests TimeEvictor evictBefore behavior
+	 * Tests TimeEvictor evictBefore behavior.
 	 * @throws Exception
 	 */
 	@Test
 	public void testTimeEvictorEvictBefore() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
-		final int TRIGGER_COUNT = 2;
-		final int WINDOW_SIZE = 4;
+		final int triggerCount = 2;
+		final int windowSize = 4;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -236,20 +230,18 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
-			TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+			TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 			new TimeWindow.Serializer(),
 			new TupleKeySelector(),
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
-			CountTrigger.of(TRIGGER_COUNT),
+			CountTrigger.of(triggerCount),
 			TimeEvictor.of(Time.seconds(2)),
 			0,
 			null /* late data output tag */);
 
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -270,13 +262,10 @@ public class EvictingWindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
 
-
-
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999));
 
-
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6500));
@@ -290,19 +279,18 @@ public class EvictingWindowOperatorTest {
 		testHarness.close();
 
 		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
 	}
 
 	/**
-	 * Tests time evictor, if no timestamp information in the StreamRecord
-	 * No element will be evicted from the window
+	 * Tests time evictor, if no timestamp information in the StreamRecord.
+	 * No element will be evicted from the window.
 	 * @throws Exception
 	 */
 	@Test
 	public void testTimeEvictorNoTimestamp() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
-		final int TRIGGER_COUNT = 2;
-		final boolean EVICT_AFTER = true;
+		final int triggerCount = 2;
+		final boolean evictAfter = true;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -313,7 +301,6 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 			GlobalWindows.create(),
 			new GlobalWindow.Serializer(),
@@ -321,12 +308,11 @@ public class EvictingWindowOperatorTest {
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
-			CountTrigger.of(TRIGGER_COUNT),
-			TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+			CountTrigger.of(triggerCount),
+			TimeEvictor.of(Time.seconds(2), evictAfter),
 			0,
 			null /* late data output tag */);
 
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -345,13 +331,10 @@ public class EvictingWindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
 
-
-
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 
-
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
@@ -365,19 +348,18 @@ public class EvictingWindowOperatorTest {
 		testHarness.close();
 
 		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
 	}
 
 	/**
-	 * Tests DeltaEvictor, evictBefore behavior
+	 * Tests DeltaEvictor, evictBefore behavior.
 	 * @throws Exception
 	 */
 	@Test
 	public void testDeltaEvictorEvictBefore() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
-		final int TRIGGER_COUNT = 2;
-		final boolean EVICT_AFTER = false;
-		final int THRESHOLD = 2;
+		final int triggerCount = 2;
+		final boolean evictAfter = false;
+		final int threshold = 2;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -388,7 +370,6 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 			GlobalWindows.create(),
 			new GlobalWindow.Serializer(),
@@ -396,18 +377,16 @@ public class EvictingWindowOperatorTest {
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
-			CountTrigger.of(TRIGGER_COUNT),
-			DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+			CountTrigger.of(triggerCount),
+			DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
 				@Override
 				public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
 					return newDataPoint.f1 - oldDataPoint.f1;
 				}
-			}, EVICT_AFTER),
+			}, evictAfter),
 			0,
 			null /* late data output tag */);
 
-
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -447,15 +426,15 @@ public class EvictingWindowOperatorTest {
 	}
 
 	/**
-	 * Tests DeltaEvictor, evictAfter behavior
+	 * Tests DeltaEvictor, evictAfter behavior.
 	 * @throws Exception
 	 */
 	@Test
 	public void testDeltaEvictorEvictAfter() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
-		final int TRIGGER_COUNT = 2;
-		final boolean EVICT_AFTER = true;
-		final int THRESHOLD = 2;
+		final int triggerCount = 2;
+		final boolean evictAfter = true;
+		final int threshold = 2;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -466,7 +445,6 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 			new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 			GlobalWindows.create(),
 			new GlobalWindow.Serializer(),
@@ -474,18 +452,16 @@ public class EvictingWindowOperatorTest {
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
-			CountTrigger.of(TRIGGER_COUNT),
-			DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() {
+			CountTrigger.of(triggerCount),
+			DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
 				@Override
 				public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
 					return newDataPoint.f1 - oldDataPoint.f1;
 				}
-			}, EVICT_AFTER),
+			}, evictAfter),
 			0,
 			null /* late data output tag */);
 
-
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -528,8 +504,8 @@ public class EvictingWindowOperatorTest {
 	@SuppressWarnings("unchecked")
 	public void testCountTrigger() throws Exception {
 
-		final int WINDOW_SIZE = 4;
-		final int WINDOW_SLIDE = 2;
+		final int windowSize = 4;
+		final int windowSlide = 2;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -540,7 +516,6 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 				new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 				GlobalWindows.create(),
 				new GlobalWindow.Serializer(),
@@ -552,12 +527,11 @@ public class EvictingWindowOperatorTest {
 								new SumReducer(),
 								// on some versions of Java we seem to need the explicit type
 								new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
-				CountTrigger.of(WINDOW_SLIDE),
-				CountEvictor.of(WINDOW_SIZE),
+				CountTrigger.of(windowSlide),
+				CountEvictor.of(windowSize),
 				0,
 				null /* late data output tag */);
 
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -580,8 +554,6 @@ public class EvictingWindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
 
-
-
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
@@ -604,12 +576,11 @@ public class EvictingWindowOperatorTest {
 	public void testCountTriggerWithApply() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
 
-		final int WINDOW_SIZE = 4;
-		final int WINDOW_SLIDE = 2;
+		final int windowSize = 4;
+		final int windowSlide = 2;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
-
 		@SuppressWarnings({"unchecked", "rawtypes"})
 		TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
 				(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
@@ -617,7 +588,6 @@ public class EvictingWindowOperatorTest {
 		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
 				new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 			GlobalWindows.create(),
 			new GlobalWindow.Serializer(),
@@ -625,12 +595,11 @@ public class EvictingWindowOperatorTest {
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
-			CountTrigger.of(WINDOW_SLIDE),
-			CountEvictor.of(WINDOW_SIZE),
+			CountTrigger.of(windowSlide),
+			CountEvictor.of(windowSize),
 			0,
 			null /* late data output tag */);
 
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -653,8 +622,6 @@ public class EvictingWindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
 
-
-
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
@@ -679,7 +646,7 @@ public class EvictingWindowOperatorTest {
 	public void testTumblingWindowWithApply() throws Exception {
 		AtomicInteger closeCalled = new AtomicInteger(0);
 
-		final int WINDOW_SIZE = 4;
+		final int windowSize = 4;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -691,14 +658,14 @@ public class EvictingWindowOperatorTest {
 				new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
-			TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+			TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 			new TimeWindow.Serializer(),
 			new TupleKeySelector(),
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
 			EventTimeTrigger.create(),
-			CountEvictor.of(WINDOW_SIZE),
+			CountEvictor.of(windowSize),
 			0,
 			null /* late data output tag */);
 
@@ -731,7 +698,6 @@ public class EvictingWindowOperatorTest {
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
 		expectedOutput.add(new Watermark(3999));
 
-
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
 			new EvictingWindowOperatorTest.ResultSortComparator());
 		testHarness.close();
@@ -741,10 +707,9 @@ public class EvictingWindowOperatorTest {
 	//  UDFs
 	// ------------------------------------------------------------------------
 
-	public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
+	private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
-
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
 				Tuple2<String, Integer> value2) throws Exception {
@@ -752,7 +717,7 @@ public class EvictingWindowOperatorTest {
 		}
 	}
 
-	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
+	private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
 		private static final long serialVersionUID = 1L;
 
 		private boolean openCalled = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
index 37fad7e..9292bfb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -24,15 +24,18 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link GlobalWindows}
+ * Tests for {@link GlobalWindows}.
  */
 public class GlobalWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
index 8786c4e..8eb26ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link KeyMap}.
+ */
 public class KeyMapPutIfAbsentTest {
 
 	@Test
@@ -38,7 +41,7 @@ public class KeyMapPutIfAbsentTest {
 				factory.set(2 * i + 1);
 				map.putIfAbsent(i, factory);
 
-				assertEquals(i+1, map.size());
+				assertEquals(i + 1, map.size());
 				assertTrue(map.getCurrentTableCapacity() > map.size());
 				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
 				assertTrue(map.size() <= map.getRehashThreshold());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
index 5b59bea..522c691 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
@@ -29,6 +29,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link KeyMap}.
+ */
 public class KeyMapPutTest {
 
 	@Test
@@ -41,7 +44,7 @@ public class KeyMapPutTest {
 			for (int i = 0; i < numElements; i++) {
 				map.put(i, 2 * i + 1);
 
-				assertEquals(i+1, map.size());
+				assertEquals(i + 1, map.size());
 				assertTrue(map.getCurrentTableCapacity() > map.size());
 				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
 				assertTrue(map.size() <= map.getRehashThreshold());
@@ -72,7 +75,6 @@ public class KeyMapPutTest {
 			assertEquals(numElements, numContained);
 			assertEquals(numElements, bitset.cardinality());
 
-
 			assertEquals(numElements, map.size());
 			assertEquals(numElements, map.traverseAndCountElements());
 			assertEquals(1 << 21, map.getCurrentTableCapacity());
@@ -91,18 +93,18 @@ public class KeyMapPutTest {
 			final int numElements = 1000000;
 
 			for (int i = 0; i < numElements; i++) {
-				Integer put = map.put(i, 2*i+1);
+				Integer put = map.put(i, 2 * i + 1);
 				assertNull(put);
 			}
 
 			for (int i = 0; i < numElements; i += 3) {
-				Integer put = map.put(i, 2*i);
+				Integer put = map.put(i, 2 * i);
 				assertNotNull(put);
-				assertEquals(2*i+1, put.intValue());
+				assertEquals(2 * i + 1, put.intValue());
 			}
 
 			for (int i = 0; i < numElements; i++) {
-				int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
+				int expected = (i % 3 == 0) ? (2 * i) : (2 * i + 1);
 				assertEquals(expected, map.get(i).intValue());
 			}
 
@@ -111,14 +113,13 @@ public class KeyMapPutTest {
 			assertEquals(1 << 21, map.getCurrentTableCapacity());
 			assertTrue(map.getLongestChainLength() <= 7);
 
-
 			BitSet bitset = new BitSet();
 			int numContained = 0;
 			for (KeyMap.Entry<Integer, Integer> entry : map) {
 				numContained++;
 
 				int key = entry.getKey();
-				int expected = key % 3 == 0 ? (2*key) : (2*key+1);
+				int expected = key % 3 == 0 ? (2 * key) : (2 * key + 1);
 
 				assertEquals(expected, entry.getValue().intValue());
 				assertFalse(bitset.get(key));

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
index c7fb108..a442466 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
@@ -24,8 +24,15 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Random;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KeyMap}.
+ */
 public class KeyMapTest {
 
 	@Test
@@ -218,7 +225,6 @@ public class KeyMapTest {
 				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
 			}
 
-
 			// check that all maps contain the total number of elements
 			int numContained = 0;
 			for (KeyMap<?, ?> map : maps) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index aa9cb91..0c45d03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,11 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -44,9 +45,16 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for verifying that {@link MergingWindowSet} correctly merges windows in various situations
@@ -152,7 +160,6 @@ public class MergingWindowSetTest {
 		assertEquals(new TimeWindow(11, 14), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction));
 		assertFalse(mergeFunction.hasMerged());
 
-
 		assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
 
 		assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(11, 14)));
@@ -179,7 +186,6 @@ public class MergingWindowSetTest {
 		assertEquals(new TimeWindow(10, 15), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction));
 		assertFalse(mergeFunction.hasMerged());
 
-
 		assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6)));
 
 		assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(10, 15)));

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
index 461b5fc..ceda3b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
@@ -15,19 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -36,13 +37,23 @@ import java.util.Collection;
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
- * Tests for {@link ProcessingTimeSessionWindows}
+ * Tests for {@link ProcessingTimeSessionWindows}.
  */
 public class ProcessingTimeSessionWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
index a0c2347..791eb42 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
index 4302d4d..7139a44 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -22,6 +23,7 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
@@ -35,7 +37,12 @@ import static org.apache.flink.streaming.runtime.operators.windowing.WindowOpera
 import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTriggerContext;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link PurgingTrigger}.
@@ -51,7 +58,7 @@ public class PurgingTriggerTest {
 		for (Method triggerMethod : Trigger.class.getDeclaredMethods()) {
 
 			// try retrieving the method, this will throw an exception if we can't find it
-			PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(),triggerMethod.getParameterTypes());
+			PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(), triggerMethod.getParameterTypes());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index ff1cbdf..62c484d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -15,19 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import java.util.Arrays;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -50,11 +40,24 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.OutputTag;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * These tests verify that {@link WindowOperator} correctly interacts with the other windowing
  * components: {@link WindowAssigner},
@@ -72,7 +75,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
 		InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
-
 		ReducingStateDescriptor<Integer> intReduceSumDescriptor =
 				new ReducingStateDescriptor<>(
 						"int-reduce",
@@ -89,7 +91,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
 		final ValueStateDescriptor<String> valueStateDescriptor =
 				new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);
 
-
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
 				createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction);
 
@@ -252,7 +253,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
 		ListStateDescriptor<Integer> intListDescriptor =
 				new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE);
 
-
 		@SuppressWarnings("unchecked")
 		WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> operator = new WindowOperator<>(
 				assigner,

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
index 050178b..33f4747 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index 4599d19..95a8314 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link SlidingEventTimeWindows}
+ * Tests for {@link SlidingEventTimeWindows}.
  */
 public class SlidingEventTimeWindowsTest extends TestLogger {
 
@@ -148,7 +151,6 @@ public class SlidingEventTimeWindowsTest extends TestLogger {
 			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
 		}
 
-
 		try {
 			SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
 			fail("should fail");

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
index 20a9924..69b628a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Tests for {@link SlidingProcessingTimeWindows}
+ * Tests for {@link SlidingProcessingTimeWindows}.
  */
 public class SlidingProcessingTimeWindowsTest extends TestLogger {
 
@@ -157,7 +161,6 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger {
 			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
 		}
 
-
 		try {
 			SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
 			fail("should fail");