You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/22 21:50:22 UTC

[03/10] flink git commit: [FLINK-6603] [streaming] Enable checkstyle on test sources

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
index bb07996..ada4d6f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -30,150 +32,146 @@ import org.hamcrest.TypeSafeMatcher;
  */
 public class StreamRecordMatchers {
 
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-          T value) {
-
-    return isStreamRecord(Matchers.equalTo(value));
-  }
-
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-      T value,
-      long timestamp) {
-
-    return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp));
-  }
-
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-          Matcher<? super T> valueMatcher) {
-    return new StreamRecordMatcher<>(valueMatcher, Matchers.anything());
-  }
-
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-      Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
-    return new StreamRecordMatcher<>(valueMatcher, timestampMatcher);
-  }
-
-  public static Matcher<TimeWindow> timeWindow(long start, long end) {
-    return Matchers.equalTo(new TimeWindow(start, end));
-  }
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @SafeVarargs
-  public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) {
-    return (Matcher) Matchers.containsInAnyOrder(windows);
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          T value) {
-    return isWindowedValue(Matchers.equalTo(value));
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          T value,
-          long timestamp) {
-    return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp));
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          T value,
-          long timestamp,
-          W window) {
-    return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
-  }
-
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, long timestamp) {
-    return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything());
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, long timestamp, W window) {
-    return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window));
-  }
-
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
-  }
-
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher);
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher);
-  }
-
-
-  private StreamRecordMatchers() {}
-
-  private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> {
-
-    private Matcher<? super T> valueMatcher;
-    private Matcher<? super Long> timestampMatcher;
-
-    private StreamRecordMatcher(
-        Matcher<? super T> valueMatcher,
-        Matcher<? super Long> timestampMatcher) {
-      this.valueMatcher = valueMatcher;
-      this.timestampMatcher = timestampMatcher;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description
-          .appendText("a StreamRecordValue(").appendValue(valueMatcher)
-          .appendText(", ").appendValue(timestampMatcher)
-          .appendText(")");
-    }
-
-    @Override
-    protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) {
-      return valueMatcher.matches(streamRecord.getValue())
-              && timestampMatcher.matches(streamRecord.getTimestamp());
-    }
-  }
-
-  private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> {
-
-    private Matcher<? super T> valueMatcher;
-    private Matcher<? super Long> timestampMatcher;
-    private Matcher<? super W> windowMatcher;
-
-
-    private WindowedValueMatcher(
-            Matcher<? super T> valueMatcher,
-            Matcher<? super Long> timestampMatcher,
-            Matcher<? super W> windowMatcher) {
-      this.valueMatcher = valueMatcher;
-      this.timestampMatcher = timestampMatcher;
-      this.windowMatcher = windowMatcher;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description
-              .appendText("a WindowedValue(").appendValue(valueMatcher)
-              .appendText(", ").appendValue(timestampMatcher)
-              .appendText(", ").appendValue(timestampMatcher)
-              .appendText(")");
-    }
-
-    @Override
-    protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) {
-      return valueMatcher.matches(streamRecord.getValue().value())
-              && timestampMatcher.matches(streamRecord.getTimestamp())
-              && windowMatcher.matches(streamRecord.getValue().window());
-    }
-  }
+	public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+		T value) {
+
+		return isStreamRecord(Matchers.equalTo(value));
+	}
+
+	public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+		T value,
+		long timestamp) {
+
+		return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+	}
+
+	public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+		Matcher<? super T> valueMatcher) {
+		return new StreamRecordMatcher<>(valueMatcher, Matchers.anything());
+	}
+
+	public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+		Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+		return new StreamRecordMatcher<>(valueMatcher, timestampMatcher);
+	}
+
+	public static Matcher<TimeWindow> timeWindow(long start, long end) {
+		return Matchers.equalTo(new TimeWindow(start, end));
+	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	@SafeVarargs
+	public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) {
+		return (Matcher) Matchers.containsInAnyOrder(windows);
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		T value) {
+		return isWindowedValue(Matchers.equalTo(value));
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		T value,
+		long timestamp) {
+		return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp));
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		T value,
+		long timestamp,
+		W window) {
+		return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		Matcher<? super T> valueMatcher, long timestamp) {
+		return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything());
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		Matcher<? super T> valueMatcher, long timestamp, W window) {
+		return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window));
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		Matcher<? super T> valueMatcher) {
+		return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) {
+		return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) {
+		return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher);
+	}
+
+	public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+		Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) {
+		return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher);
+	}
+
+	private StreamRecordMatchers() {
+	}
+
+	private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> {
+
+		private Matcher<? super T> valueMatcher;
+		private Matcher<? super Long> timestampMatcher;
+
+		private StreamRecordMatcher(
+			Matcher<? super T> valueMatcher,
+			Matcher<? super Long> timestampMatcher) {
+			this.valueMatcher = valueMatcher;
+			this.timestampMatcher = timestampMatcher;
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description
+				.appendText("a StreamRecordValue(").appendValue(valueMatcher)
+				.appendText(", ").appendValue(timestampMatcher)
+				.appendText(")");
+		}
+
+		@Override
+		protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) {
+			return valueMatcher.matches(streamRecord.getValue())
+				&& timestampMatcher.matches(streamRecord.getTimestamp());
+		}
+	}
+
+	private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> {
+
+		private Matcher<? super T> valueMatcher;
+		private Matcher<? super Long> timestampMatcher;
+		private Matcher<? super W> windowMatcher;
+
+		private WindowedValueMatcher(
+			Matcher<? super T> valueMatcher,
+			Matcher<? super Long> timestampMatcher,
+			Matcher<? super W> windowMatcher) {
+			this.valueMatcher = valueMatcher;
+			this.timestampMatcher = timestampMatcher;
+			this.windowMatcher = windowMatcher;
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description
+				.appendText("a WindowedValue(").appendValue(valueMatcher)
+				.appendText(", ").appendValue(timestampMatcher)
+				.appendText(", ").appendValue(timestampMatcher)
+				.appendText(")");
+		}
+
+		@Override
+		protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) {
+			return valueMatcher.matches(streamRecord.getValue().value())
+				&& timestampMatcher.matches(streamRecord.getTimestamp())
+				&& windowMatcher.matches(streamRecord.getValue().window());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
index 341171d..f4e0f1d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -19,41 +19,45 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+/**
+ * Tests for {@link TimeWindow}.
+ */
 public class TimeWindowTest {
 	@Test
 	public void testGetWindowStartWithOffset() {
-		//[0,7),[7,14),[14,21)...
+		// [0, 7), [7, 14), [14, 21)...
 		long offset = 0;
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), 0);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6, offset, 7), 0);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 7);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8, offset, 7), 7);
 
-		//[-4,3),[3,10),[10,17)...
+		// [-4, 3), [3, 10), [10, 17)...
 		offset = 3;
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -4);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -4);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), 3);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9, offset, 7), 3);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10, offset, 7), 10);
 
-		//[-2,5),[5,12),[12,19)...
+		// [-2, 5), [5, 12), [12, 19)...
 		offset = -2;
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), -2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4, offset, 7), -2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 5);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12, offset, 7), 12);
 
 		// for GMT+8:00
-		offset = - TimeUnit.HOURS.toMillis(8);
+		offset = -TimeUnit.HOURS.toMillis(8);
 		long size = TimeUnit.DAYS.toMillis(1);
-		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450L, offset, size), 1470844800000L);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index df65ca2..dc0e21c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -28,8 +29,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
@@ -161,7 +162,7 @@ public class TimeWindowTranslationTest {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
 				.reduce(new DummyReducer());
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -185,7 +186,7 @@ public class TimeWindowTranslationTest {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
 				.fold(new Tuple2<>("", 1), new DummyFolder());
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -235,7 +236,7 @@ public class TimeWindowTranslationTest {
 	 * These tests ensure that the fast aligned time windows operator is used if the
 	 * conditions are right.
 	 *
-	 * TODO: update once the fast aligned time windows operator is in
+	 * <p>TODO: update once the fast aligned time windows operator is in
 	 */
 	@Ignore
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 4267444..5aa47e8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.JobID;
@@ -36,9 +37,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
 import org.apache.flink.streaming.api.operators.TestInternalTimerService;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -123,7 +124,7 @@ public class TriggerTestHarness<T, W extends Window> {
 
 	/**
 	 * Injects one element into the trigger for the given window and returns the result of
-	 * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+	 * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}.
 	 */
 	public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
 		TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index 2373a86..9e4669f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,17 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link TumblingEventTimeWindows}
+ * Tests for {@link TumblingEventTimeWindows}.
  */
 public class TumblingEventTimeWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
index 348b6fa..a611fc0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Tests for {@link TumblingProcessingTimeWindows}
+ * Tests for {@link TumblingProcessingTimeWindows}.
  */
 public class TumblingProcessingTimeWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 0d80605..8ceda45 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -15,38 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
 
-import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -64,6 +38,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -73,6 +48,31 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Base for window operator tests that verify correct interaction with the other windowing
  * components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner},
@@ -128,7 +128,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		return mockAssigner;
 	}
 
-
 	static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() throws Exception {
 		@SuppressWarnings("unchecked")
 		MergingWindowAssigner<T, TimeWindow> mockAssigner = mock(MergingWindowAssigner.class);
@@ -139,7 +138,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		return mockAssigner;
 	}
 
-
 	static WindowAssigner.WindowAssignerContext anyAssignerContext() {
 		return Mockito.any();
 	}
@@ -177,7 +175,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		return Mockito.any();
 	}
 
-
 	static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
 		doAnswer(new Answer<TriggerResult>() {
 			@Override
@@ -369,7 +366,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 	}
 
-
 	@Test
 	public void testAssignerIsInvokedOncePerElement() throws Exception {
 
@@ -540,7 +536,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testEmittingFromWindowFunction(new ProcessingTimeAdaptor());
 	}
 
-
 	private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
@@ -1258,7 +1253,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testTimerFiring(new ProcessingTimeAdaptor());
 	}
 
-
 	private void testTimerFiring(TimeDomainAdaptor timeAdaptor) throws Exception {
 
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
@@ -1382,8 +1376,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
 
 		verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback());
-
-
 	}
 
 	@Test
@@ -2392,7 +2384,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		doAnswer(new Answer<Object>() {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
 				context.windowState().getState(valueStateDescriptor).update("hello");
 				return null;
 			}
@@ -2401,7 +2393,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		doAnswer(new Answer<Object>() {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[1];
 				context.windowState().getState(valueStateDescriptor).clear();
 				return null;
 			}
@@ -2441,7 +2433,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		doAnswer(new Answer<Object>() {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
 				context.windowState().getState(valueStateDescriptor).update("hello");
 				return null;
 			}
@@ -2481,7 +2473,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		doAnswer(new Answer<Object>() {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
 				timeAdaptor.verifyCorrectTime(testHarness, context);
 				return null;
 			}
@@ -2490,7 +2482,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		doAnswer(new Answer<Object>() {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[1];
 				timeAdaptor.verifyCorrectTime(testHarness, context);
 				return null;
 			}
@@ -2520,7 +2512,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 			long allowedLatenss,
 			InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception;
 
-
 	private interface TimeDomainAdaptor {
 
 		void setIsEventTime(WindowAssigner<?, ?> mockAssigner);
@@ -2535,9 +2526,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		int numTimersOtherDomain(AbstractStreamOperatorTestHarness testHarness);
 
-		void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, final long timestamp) throws Exception;
+		void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception;
 
-		void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, final long timestamp) throws Exception;
+		void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception;
 
 		void shouldContinueOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
index 9ec1923..904a8b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
 import java.net.URL;
@@ -63,8 +65,7 @@ import static org.junit.Assert.fail;
  * Tests for checking whether {@link WindowOperator} can restore from snapshots that were done
  * using the Flink 1.1 {@link WindowOperator}.
  *
- * <p>
- * This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1
+ * <p>This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1
  * aligned processing-time windows operator.
  *
  * <p>For regenerating the binary snapshot file you have to run the commented out portion
@@ -85,7 +86,7 @@ public class WindowOperatorFrom11MigrationTest {
 	@SuppressWarnings("unchecked")
 	public void testRestoreSessionWindowsWithCountTriggerFromFlink11() throws Exception {
 
-		final int SESSION_SIZE = 3;
+		final int sessionSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -93,7 +94,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -131,7 +132,6 @@ public class WindowOperatorFrom11MigrationTest {
 		testHarness.close();
         */
 
-
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -146,7 +146,6 @@ public class WindowOperatorFrom11MigrationTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
 
-
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
 		// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
@@ -167,7 +166,7 @@ public class WindowOperatorFrom11MigrationTest {
 	@SuppressWarnings("unchecked")
 	public void testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws Exception {
 
-		final int SESSION_SIZE = 3;
+		final int sessionSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -175,7 +174,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -244,7 +243,7 @@ public class WindowOperatorFrom11MigrationTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testRestoreReducingEventTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -253,7 +252,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -334,7 +333,7 @@ public class WindowOperatorFrom11MigrationTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testRestoreApplyEventTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -342,7 +341,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -423,7 +422,7 @@ public class WindowOperatorFrom11MigrationTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testRestoreReducingProcessingTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -432,7 +431,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -500,7 +499,7 @@ public class WindowOperatorFrom11MigrationTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -508,7 +507,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -622,7 +621,7 @@ public class WindowOperatorFrom11MigrationTest {
 
 		*/
 
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -631,7 +630,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -725,7 +724,7 @@ public class WindowOperatorFrom11MigrationTest {
 		testHarness.close();
 
 		*/
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -734,7 +733,7 @@ public class WindowOperatorFrom11MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -774,7 +773,6 @@ public class WindowOperatorFrom11MigrationTest {
 		testHarness.close();
 	}
 
-
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 		private static final long serialVersionUID = 1L;
 
@@ -832,7 +830,7 @@ public class WindowOperatorFrom11MigrationTest {
 		}
 	}
 
-	public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
+	private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
@@ -841,7 +839,7 @@ public class WindowOperatorFrom11MigrationTest {
 		}
 	}
 
-	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
+	private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
 		private static final long serialVersionUID = 1L;
 
 		private boolean openCalled = false;
@@ -877,7 +875,7 @@ public class WindowOperatorFrom11MigrationTest {
 
 	}
 
-	public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+	private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
index 0d3a6dc..6e9db1a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
@@ -15,13 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
-import static org.junit.Assert.fail;
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -57,9 +53,16 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
 /**
  * Tests for checking whether {@link WindowOperator} can restore from snapshots that were done
  * using the Flink 1.2 {@link WindowOperator}.
@@ -78,7 +81,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Ignore
 	@Test
 	public void writeSessionWindowsWithCountTriggerSnapshot() throws Exception {
-		final int SESSION_SIZE = 3;
+		final int sessionSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -86,7 +89,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -122,7 +125,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Test
 	public void testRestoreSessionWindowsWithCountTrigger() throws Exception {
 
-		final int SESSION_SIZE = 3;
+		final int sessionSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -130,7 +133,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -157,7 +160,6 @@ public class WindowOperatorFrom12MigrationTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
 
-
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
 		// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
@@ -177,7 +179,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Test
 	public void writeSessionWindowsWithCountTriggerInMintConditionSnapshot() throws Exception {
 
-		final int SESSION_SIZE = 3;
+		final int sessionSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -185,7 +187,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -215,7 +217,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Test
 	public void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Exception {
 
-		final int SESSION_SIZE = 3;
+		final int sessionSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -223,7 +225,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -280,7 +282,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Ignore
 	@Test
 	public void writeReducingEventTimeWindowsSnapshot() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -289,7 +291,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -336,7 +338,7 @@ public class WindowOperatorFrom12MigrationTest {
 
 	@Test
 	public void testRestoreReducingEventTimeWindows() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -345,7 +347,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -391,7 +393,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Ignore
 	@Test
 	public void writeApplyEventTimeWindowsSnapshot() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -399,7 +401,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -446,7 +448,7 @@ public class WindowOperatorFrom12MigrationTest {
 
 	@Test
 	public void testRestoreApplyEventTimeWindows() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -454,7 +456,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -500,7 +502,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Ignore
 	@Test
 	public void writeReducingProcessingTimeWindowsSnapshot() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -509,7 +511,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -550,7 +552,7 @@ public class WindowOperatorFrom12MigrationTest {
 
 	@Test
 	public void testRestoreReducingProcessingTimeWindows() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -559,7 +561,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -600,7 +602,7 @@ public class WindowOperatorFrom12MigrationTest {
 	@Ignore
 	@Test
 	public void writeApplyProcessingTimeWindowsSnapshot() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -608,7 +610,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -648,7 +650,7 @@ public class WindowOperatorFrom12MigrationTest {
 
 	@Test
 	public void testRestoreApplyProcessingTimeWindows() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -656,7 +658,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -738,7 +740,7 @@ public class WindowOperatorFrom12MigrationTest {
 
 	@Test
 	public void testRestoreAggregatingAlignedProcessingTimeWindows() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -747,7 +749,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -841,7 +843,7 @@ public class WindowOperatorFrom12MigrationTest {
 
 	@Test
 	public void testRestoreAccumulatingAlignedProcessingTimeWindows() throws Exception {
-		final int WINDOW_SIZE = 3;
+		final int windowSize = 3;
 
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -850,7 +852,7 @@ public class WindowOperatorFrom12MigrationTest {
 				inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -892,7 +894,6 @@ public class WindowOperatorFrom12MigrationTest {
 		testHarness.close();
 	}
 
-
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 		private static final long serialVersionUID = 1L;
 
@@ -950,7 +951,7 @@ public class WindowOperatorFrom12MigrationTest {
 		}
 	}
 
-	public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
+	private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
@@ -959,7 +960,7 @@ public class WindowOperatorFrom12MigrationTest {
 		}
 	}
 
-	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
+	private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
 		private static final long serialVersionUID = 1L;
 
 		private boolean openCalled = false;
@@ -995,7 +996,7 @@ public class WindowOperatorFrom12MigrationTest {
 
 	}
 
-	public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+	private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override