You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:12 UTC

[5/9] flink git commit: [FLINK-5590] [runtime] Add proper internal state hierarchy

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 628d663..9b8af58 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -19,10 +19,10 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.commons.math3.util.ArithmeticUtils;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
@@ -42,6 +42,9 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAppendingState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
 import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -107,22 +110,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected final WindowAssigner<? super IN, W> windowAssigner;
 
-	protected final KeySelector<IN, K> keySelector;
-
-	protected final Trigger<? super IN, ? super W> trigger;
+	private final KeySelector<IN, K> keySelector;
 
-	protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
+	private final Trigger<? super IN, ? super W> trigger;
 
-	protected final ListStateDescriptor<Tuple2<W, W>> mergingWindowsDescriptor;
+	private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
 
-	/**
-	 * For serializing the key in checkpoints.
-	 */
+	/** For serializing the key in checkpoints. */
 	protected final TypeSerializer<K> keySerializer;
 
-	/**
-	 * For serializing the window in checkpoints.
-	 */
+	/** For serializing the window in checkpoints. */
 	protected final TypeSerializer<W> windowSerializer;
 
 	/**
@@ -133,15 +130,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 *         {@code window.maxTimestamp + allowedLateness} landmark.
 	 * </ul>
 	 */
-	protected final long allowedLateness;
+	private final long allowedLateness;
 
 	// ------------------------------------------------------------------------
 	// State that is not checkpointed
 	// ------------------------------------------------------------------------
 
-	/**
-	 * This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp.
-	 */
+	/** The state in which the window contents is stored. Each window is a namespace */
+	private transient InternalAppendingState<W, IN, ACC> windowState;
+
+	/** The {@link #windowState}, typed to merging state for merging windows.
+	 * Null if the window state is not mergeable */
+	private transient InternalMergingState<W, IN, ACC> windowMergingState;
+
+	/** The state that holds the merging window metadata (the sets that describe what is merged) */
+	private transient InternalListState<VoidNamespace, Tuple2<W, W>> mergingSetsState;
+
+	/** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
 	protected transient Context context = new Context(null, null);
@@ -234,14 +239,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		this.allowedLateness = allowedLateness;
 		this.legacyWindowOperatorType = legacyWindowOperatorType;
 
-		if (windowAssigner instanceof MergingWindowAssigner) {
-			@SuppressWarnings({"unchecked", "rawtypes"})
-			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
-			mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
-		} else {
-			mergingWindowsDescriptor = null;
-		}
-
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
@@ -263,6 +260,43 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			}
 		};
 
+		// create (or restore) the state that hold the actual window contents
+		// NOTE - the state may be null in the case of the overriding evicting window operator
+		if (windowStateDescriptor != null) {
+			windowState = (InternalAppendingState<W, IN, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
+		}
+
+		// create the typed and helper states for merging windows
+		if (windowAssigner instanceof MergingWindowAssigner) {
+
+			// store a typed reference for the state of merging windows - sanity check
+			if (windowState instanceof InternalMergingState) {
+				windowMergingState = (InternalMergingState<W, IN, ACC>) windowState;
+			}
+			// TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
+			// TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
+			// TODO activate the sanity check once resolved
+//			else if (windowState != null) {
+//				throw new IllegalStateException(
+//						"The window uses a merging assigner, but the window state is not mergeable.");
+//			}
+
+			@SuppressWarnings("unchecked")
+			final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;
+
+			final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
+					typedTuple,
+					new TypeSerializer[] {windowSerializer, windowSerializer} );
+
+			final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
+					new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+
+			// get the state that stores the merging sets
+			mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>) 
+					getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
+			mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
+		}
+
 		registerRestoredLegacyStateState();
 	}
 
@@ -283,12 +317,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		Collection<W> elementWindows = windowAssigner.assignWindows(
+		final Collection<W> elementWindows = windowAssigner.assignWindows(
 			element.getValue(), element.getTimestamp(), windowAssignerContext);
-
-		final K key = (K) getKeyedStateBackend().getCurrentKey();
+		
+		final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			MergingWindowSet<W> mergingWindows = getMergingWindowSet();
@@ -315,11 +348,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 						}
 
 						// merge the merged state windows into the newly resulting state window
-						getKeyedStateBackend().mergePartitionedStates(
-							stateWindowResult,
-							mergedStateWindows,
-							windowSerializer,
-							(StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
+						windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
 					}
 				});
 
@@ -334,8 +363,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
 				}
 
-				AppendingState<IN, ACC> windowState =
-						getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+				windowState.setCurrentNamespace(stateWindow);
 				windowState.add(element.getValue());
 
 				context.key = key;
@@ -368,8 +396,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					continue;
 				}
 
-				AppendingState<IN, ACC> windowState =
-						getPartitionedState(window, windowSerializer, windowStateDescriptor);
+				windowState.setCurrentNamespace(window);
 				windowState.add(element.getValue());
 
 				context.key = key;
@@ -399,8 +426,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		context.key = timer.getKey();
 		context.window = timer.getNamespace();
 
-		AppendingState<IN, ACC> windowState;
-		MergingWindowSet<W> mergingWindows = null;
+		MergingWindowSet<W> mergingWindows;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
@@ -411,12 +437,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				// so it is safe to just ignore
 				return;
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+			
+			windowState.setCurrentNamespace(stateWindow);
 		} else {
-			windowState = getPartitionedState(
-					context.window,
-					windowSerializer,
-					windowStateDescriptor);
+			windowState.setCurrentNamespace(context.window);
+			mergingWindows = null;
 		}
 
 		ACC contents = windowState.get();
@@ -440,8 +465,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		context.key = timer.getKey();
 		context.window = timer.getNamespace();
 
-		AppendingState<IN, ACC> windowState;
-		MergingWindowSet<W> mergingWindows = null;
+		MergingWindowSet<W> mergingWindows;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
@@ -452,9 +476,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				// so it is safe to just ignore
 				return;
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+			windowState.setCurrentNamespace(stateWindow);
 		} else {
-			windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+			windowState.setCurrentNamespace(context.window);
+			mergingWindows = null;
 		}
 
 		ACC contents = windowState.get();
@@ -507,13 +532,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * {@link MergingWindowSet#persist()}.
 	 */
 	protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
-		ListState<Tuple2<W, W>> mergeState =
-				getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergingWindowsDescriptor);
-
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		MergingWindowSet<W> mergingWindows = new MergingWindowSet<>((MergingWindowAssigner) windowAssigner, mergeState);
-
-		return mergingWindows;
+		@SuppressWarnings("unchecked")
+		MergingWindowAssigner<? super IN, W> mergingAssigner = (MergingWindowAssigner<? super IN, W>) windowAssigner;
+		return new MergingWindowSet<>(mergingAssigner, mergingSetsState);
 	}
 
 	/**
@@ -655,11 +676,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
 			if (mergedWindows != null && mergedWindows.size() > 0) {
 				try {
-					WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(window,
-							mergedWindows,
-							windowSerializer,
-							stateDescriptor);
-				} catch (Exception e) {
+					S rawState = getKeyedStateBackend().getOrCreateKeyedState(windowSerializer, stateDescriptor);
+
+					if (rawState instanceof InternalMergingState) {
+						@SuppressWarnings("unchecked")
+						InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState;
+						mergingState.mergeNamespaces(window, mergedWindows);
+					}
+					else {
+						throw new IllegalArgumentException(
+								"The given state descriptor does not refer to a mergeable state (MergingState)");
+					}
+				}
+				catch (Exception e) {
 					throw new RuntimeException("Error while merging state.", e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 0e2d1e8..2faa506 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -72,6 +72,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -83,6 +84,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("serial")
 public class WindowOperatorTest extends TestLogger {
 
 	// For counting if close() is called the correct number of times on the SumReducer
@@ -758,7 +760,7 @@ public class WindowOperatorTest extends TestLogger {
 				0);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -846,7 +848,7 @@ public class WindowOperatorTest extends TestLogger {
 				0);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1124,7 +1126,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		
 		testHarness.open();
 		
@@ -1184,7 +1186,7 @@ public class WindowOperatorTest extends TestLogger {
 					LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 		
 		testHarness.open();
 
@@ -1250,7 +1252,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -1310,7 +1312,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 		
@@ -1385,7 +1387,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 		
@@ -1475,7 +1477,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 		
@@ -1559,7 +1561,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -1643,7 +1645,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 		
@@ -1736,7 +1738,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 		
@@ -1821,7 +1823,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -1902,11 +1904,11 @@ public class WindowOperatorTest extends TestLogger {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowStateDesc,
 				new InternalIterableWindowFunction<>(new PassThroughFunction2()),
-				new EventTimeTriggerAccumGC(LATENESS),
+					new EventTimeTriggerAccumGC(LATENESS),
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -1929,7 +1931,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
-	private class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
+	private static class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -1960,7 +1962,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -2006,7 +2008,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -2063,7 +2065,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -2108,7 +2110,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -2152,7 +2154,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -2172,6 +2174,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
+	// TODO this test seems invalid, as it uses the unsupported combination of merging windows and folding window state
 	@Test
 	public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception {
 		final int GAP_SIZE = 3;
@@ -2206,7 +2209,7 @@ public class WindowOperatorTest extends TestLogger {
 				LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
 
@@ -2230,7 +2233,7 @@ public class WindowOperatorTest extends TestLogger {
 	//  UDFs
 	// ------------------------------------------------------------------------
 
-	private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
+	private static class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -2289,7 +2292,7 @@ public class WindowOperatorTest extends TestLogger {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static class Tuple2ResultSortComparator implements Comparator<Object> {
+	private static class Tuple2ResultSortComparator implements Comparator<Object>, Serializable {
 		@Override
 		public int compare(Object o1, Object o2) {
 			if (o1 instanceof Watermark || o2 instanceof Watermark) {
@@ -2311,7 +2314,7 @@ public class WindowOperatorTest extends TestLogger {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static class Tuple3ResultSortComparator implements Comparator<Object> {
+	private static class Tuple3ResultSortComparator implements Comparator<Object>, Serializable {
 		@Override
 		public int compare(Object o1, Object o2) {
 			if (o1 instanceof Watermark || o2 instanceof Watermark) {
@@ -2403,15 +2406,11 @@ public class WindowOperatorTest extends TestLogger {
 	 * purge the state of the fired window. This is to test the state
 	 * garbage collection mechanism.
 	 */
-	public class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> {
+	public static class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		private long cleanupTime;
 
-		private EventTimeTriggerAccumGC() {
-			cleanupTime = 0L;
-		}
-
 		public EventTimeTriggerAccumGC(long cleanupTime) {
 			this.cleanupTime = cleanupTime;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 0e1aca0..0562443 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -18,9 +18,7 @@
 
 package org.apache.flink.test.query;
 
-
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -30,10 +28,14 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
@@ -59,14 +61,18 @@ public final class KVStateRequestSerializerRocksDBTest {
 	 */
 	final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
 
-		RocksDBKeyedStateBackend2(final JobID jobId,
-			final String operatorIdentifier,
-			final ClassLoader userCodeClassLoader,
-			final File instanceBasePath, final DBOptions dbOptions,
-			final ColumnFamilyOptions columnFamilyOptions,
-			final TaskKvStateRegistry kvStateRegistry,
-			final TypeSerializer<K> keySerializer, final int numberOfKeyGroups,
-			final KeyGroupRange keyGroupRange) throws Exception {
+		RocksDBKeyedStateBackend2(
+				final JobID jobId,
+				final String operatorIdentifier,
+				final ClassLoader userCodeClassLoader,
+				final File instanceBasePath,
+				final DBOptions dbOptions,
+				final ColumnFamilyOptions columnFamilyOptions,
+				final TaskKvStateRegistry kvStateRegistry,
+				final TypeSerializer<K> keySerializer,
+				final int numberOfKeyGroups,
+				final KeyGroupRange keyGroupRange) throws Exception {
+
 			super(jobId, operatorIdentifier, userCodeClassLoader,
 				instanceBasePath,
 				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
@@ -74,9 +80,10 @@ public final class KVStateRequestSerializerRocksDBTest {
 		}
 
 		@Override
-		public <N, T> ListState<T> createListState(
+		public <N, T> InternalListState<N, T> createListState(
 			final TypeSerializer<N> namespaceSerializer,
 			final ListStateDescriptor<T> stateDesc) throws Exception {
+
 			return super.createListState(namespaceSerializer, stateDesc);
 		}
 	}
@@ -90,8 +97,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 	 */
 	@Test
 	public void testListSerialization() throws Exception {
-		final long key = 0l;
-		TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+		final long key = 0L;
 
 		// objects for RocksDB state list serialisation
 		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
@@ -110,9 +116,10 @@ public final class KVStateRequestSerializerRocksDBTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final ListState<Long> listState = longHeapKeyedStateBackend
+		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
 			.createListState(VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
 		KvStateRequestSerializerTest.testListSerialization(key, listState);
 	}
 }