You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:13 UTC

[6/9] flink git commit: [FLINK-5590] [runtime] Add proper internal state hierarchy

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 9552325..f0eb53e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
@@ -39,8 +40,8 @@ import java.util.Map;
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-		extends AbstractHeapState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
-		implements ListState<V> {
+		extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+		implements InternalListState<N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -59,6 +60,10 @@ public class HeapListState<K, N, V>
 		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
+	// ------------------------------------------------------------------------
+	//  state access
+	// ------------------------------------------------------------------------
+
 	@Override
 	public Iterable<V> get() {
 		Preconditions.checkState(currentNamespace != null, "No namespace set.");
@@ -154,4 +159,14 @@ public class HeapListState<K, N, V>
 
 		return baos.toByteArray();
 	}
+
+	// ------------------------------------------------------------------------
+	//  state merging
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
+		a.addAll(b);
+		return a;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 37aa812..7804cb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -37,8 +38,8 @@ import java.util.Map;
  * @param <V> The type of the value.
  */
 public class HeapReducingState<K, N, V>
-		extends AbstractHeapState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>>
-		implements ReducingState<V> {
+		extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
+		implements InternalReducingState<N, V> {
 
 	private final ReduceFunction<V> reduceFunction;
 
@@ -56,10 +57,15 @@ public class HeapReducingState<K, N, V>
 			StateTable<K, N, V> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
+
 		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
 		this.reduceFunction = stateDesc.getReduceFunction();
 	}
 
+	// ------------------------------------------------------------------------
+	//  state access
+	// ------------------------------------------------------------------------
+
 	@Override
 	public V get() {
 		Preconditions.checkState(currentNamespace != null, "No namespace set.");
@@ -111,13 +117,22 @@ public class HeapReducingState<K, N, V>
 		if (currentValue == null) {
 			// we're good, just added the new value
 		} else {
-			V reducedValue = null;
+			V reducedValue;
 			try {
 				reducedValue = reduceFunction.reduce(currentValue, value);
 			} catch (Exception e) {
-				throw new RuntimeException("Could not add value to reducing state.", e);
+				throw new IOException("Exception while applying ReduceFunction in reducing state", e);
 			}
 			keyedMap.put(backend.<K>getCurrentKey(), reducedValue);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  state merging
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected V mergeState(V a, V b) throws Exception {
+		return reduceFunction.reduce(a, b);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index cccaacb..9e042fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
@@ -36,7 +37,7 @@ import java.util.Map;
  */
 public class HeapValueState<K, N, V>
 		extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
-		implements ValueState<V> {
+		implements InternalValueState<N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 797150a..9d7232e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -78,4 +79,21 @@ public class StateTable<K, N, ST> {
 	public List<Map<N, Map<K, ST>>> getState() {
 		return state;
 	}
+
+	// ------------------------------------------------------------------------
+	//  for testing
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	boolean isEmpty() {
+		for (Map<N, Map<K, ST>> map : state) {
+			if (map != null) {
+				if (!map.isEmpty()) {
+					return false;
+				}
+			}
+		}
+
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
new file mode 100644
index 0000000..ae9f457
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.AppendingState;
+
+/**
+ * The peer to the {@link AppendingState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ * 
+ * @param <N>   The type of the namespace
+ * @param <IN>  The type of elements added to the state
+ * @param <OUT> The type of the 
+ */
+public interface InternalAppendingState<N, IN, OUT> extends InternalKvState<N>, AppendingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
new file mode 100644
index 0000000..eb58ce5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.FoldingState;
+
+/**
+ * The peer to the {@link FoldingState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ */
+public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
new file mode 100644
index 0000000..06f64b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.State;
+
+/**
+ * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ * 
+ * <p>The internal state classes give access to the namespace getters and setters and access to
+ * additional functionality, like raw value access or state merging.
+ * 
+ * <p>The public API state hierarchy is intended to be programmed against by Flink applications.
+ * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not
+ * intended to be used by user applications. These internal methods are considered of limited use to users and
+ * only confusing, and are usually not regarded as stable across releases.
+ * 
+ * <p>Each specific type in the internal state hierarchy extends the type from the public
+ * state hierarchy:
+ * 
+ * <pre>
+ *             State
+ *               |
+ *               +-------------------InternalKvState
+ *               |                         |
+ *          MergingState                   |
+ *               |                         |
+ *               +-----------------InternalMergingState
+ *               |                         |
+ *      +--------+------+                  |
+ *      |               |                  |
+ * ReducingState    ListState        +-----+-----------------+
+ *      |               |            |                       |
+ *      +-----------+   +-----------   -----------------InternalListState
+ *                  |                |
+ *                  +---------InternalReducingState
+ * </pre>
+ * 
+ * @param <N> The type of the namespace.
+ */
+public interface InternalKvState<N> extends State {
+
+	/**
+	 * Sets the current namespace, which will be used when using the state access methods.
+	 *
+	 * @param namespace The namespace.
+	 */
+	void setCurrentNamespace(N namespace);
+
+	/**
+	 * Returns the serialized value for the given key and namespace.
+	 *
+	 * <p>If no value is associated with key and namespace, <code>null</code>
+	 * is returned.
+	 *
+	 * @param serializedKeyAndNamespace Serialized key and namespace
+	 * @return Serialized value or <code>null</code> if no value is associated with the key and namespace.
+	 * 
+	 * @throws Exception Exceptions during serialization are forwarded
+	 */
+	byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
new file mode 100644
index 0000000..ae392ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ListState;
+
+/**
+ * The peer to the {@link ListState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
new file mode 100644
index 0000000..abc7d7c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.MergingState;
+
+import java.util.Collection;
+
+/**
+ * The peer to the {@link MergingState} in the internal state type hierarchy.
+ * 
+ * See {@link InternalKvState} for a description of the internal state hierarchy.
+ * 
+ * @param <N>   The type of the namespace
+ * @param <IN>  The type of elements added to the state
+ * @param <OUT> The type of elements 
+ */
+public interface InternalMergingState<N, IN, OUT> extends InternalAppendingState<N, IN, OUT>, MergingState<IN, OUT> {
+
+	/**
+	 * Merges the state of the current key for the given source namespaces into the state of
+	 * the target namespace.
+	 * 
+	 * @param target The target namespace where the merged state should be stored.
+	 * @param sources The source namespaces whose state should be merged.
+	 * 
+	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
+	 */
+	void mergeNamespaces(N target, Collection<N> sources) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
new file mode 100644
index 0000000..40e625c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ReducingState;
+
+/**
+ * The peer to the {@link ReducingState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
new file mode 100644
index 0000000..7177b8a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ValueState;
+
+/**
+ * The peer to the {@link ValueState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalValueState<N, T> extends InternalKvState<N>, ValueState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 86f8766..1c02ca1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -43,8 +43,8 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -593,7 +593,7 @@ public class KvStateClientTest {
 				state.update(201 + i);
 
 				// we know it must be a KvStat but this is not exposed to the user via State
-				KvState<?> kvState = (KvState<?>) state;
+				InternalKvState<?> kvState = (InternalKvState<?>) state;
 
 				// Register KvState (one state instance for all server)
 				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index b1ec86f..202024c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -40,9 +40,9 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -267,7 +267,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the failure response on a failure on the {@link KvState#getSerializedValue(byte[])}
+	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])}
 	 * call.
 	 */
 	@Test
@@ -279,7 +279,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Failing KvState
-		KvState<?> kvState = mock(KvState.class);
+		InternalKvState<?> kvState = mock(InternalKvState.class);
 		when(kvState.getSerializedValue(any(byte[].class)))
 				.thenThrow(new RuntimeException("Expected test Exception"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 0d9c2e4..69dbe6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.query.netty.message;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
-import org.apache.flink.api.common.state.ListState;
+
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -30,10 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -315,7 +317,7 @@ public class KvStateRequestSerializerTest {
 	 */
 	@Test
 	public void testListSerialization() throws Exception {
-		final long key = 0l;
+		final long key = 0L;
 
 		// objects for heap state list serialisation
 		final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
@@ -327,9 +329,10 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final ListState<Long> listState = longHeapKeyedStateBackend
-			.createListState(VoidNamespaceSerializer.INSTANCE,
+		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
+				VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
 		testListSerialization(key, listState);
 	}
 
@@ -340,19 +343,16 @@ public class KvStateRequestSerializerTest {
 	 * @param key
 	 * 		key of the list state
 	 * @param listState
-	 * 		list state using the {@link VoidNamespace}, must also be a {@link
-	 * 		KvState} instance
+	 * 		list state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
 	 *
 	 * @throws Exception
 	 */
-	public static void testListSerialization(final long key,
-		final ListState<Long> listState) throws Exception {
+	public static void testListSerialization(
+			final long key,
+			final InternalListState<VoidNamespace, Long> listState) throws Exception {
 
 		TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
-
-		final KvState<VoidNamespace> listKvState =
-			(KvState<VoidNamespace>) listState;
-		listKvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+		listState.setCurrentNamespace(VoidNamespace.INSTANCE);
 
 		// List
 		final int numElements = 10;
@@ -368,8 +368,8 @@ public class KvStateRequestSerializerTest {
 			KvStateRequestSerializer.serializeKeyAndNamespace(
 				key, LongSerializer.INSTANCE,
 				VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
-		final byte[] serializedValues =
-			listKvState.getSerializedValue(serializedKey);
+		
+		final byte[] serializedValues = listState.getSerializedValue(serializedKey);
 
 		List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
 		assertEquals(expectedValues, actualValues);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 38e04aa..c560ab0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.heap.AbstractHeapState;
 import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -167,7 +168,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
 		// some modifications to the state
 		backend.setCurrentKey(1);
@@ -214,7 +215,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
 		backend.setCurrentKey(1);
 		assertEquals("1", restored1.value());
@@ -230,7 +231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ValueState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
 		backend.setCurrentKey(1);
 		assertEquals("u1", restored2.value());
@@ -246,7 +247,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	/**
-	 * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])}
+	 * Tests {@link ValueState#value()} and {@link InternalKvState#getSerializedValue(byte[])}
 	 * accessing the state concurrently. They should not get in the way of each
 	 * other.
 	 */
@@ -255,7 +256,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	public void testValueStateRace() throws Exception {
 		final AbstractKeyedStateBackend<Integer> backend =
 			createKeyedBackend(IntSerializer.INSTANCE);
-		final Integer namespace = Integer.valueOf(1);
+		final Integer namespace = 1;
 
 		final ValueStateDescriptor<String> kvId =
 			new ValueStateDescriptor<>("id", String.class);
@@ -270,7 +271,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			.getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
 
 		@SuppressWarnings("unchecked")
-		final KvState<Integer> kvState = (KvState<Integer>) state;
+		final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state;
 
 		/**
 		 * 1) Test that ValueState#value() before and after
@@ -496,7 +497,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
 			Joiner joiner = Joiner.on(",");
 			// some modifications to the state
@@ -544,7 +545,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
 			backend.setCurrentKey(1);
 			assertEquals("1", joiner.join(restored1.get()));
@@ -560,7 +561,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
 			backend.setCurrentKey(1);
 			assertEquals("1,u1", joiner.join(restored2.get()));
@@ -596,7 +597,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
 			// some modifications to the state
 			backend.setCurrentKey(1);
@@ -643,7 +644,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
 			backend.setCurrentKey(1);
 			assertEquals("1", restored1.get());
@@ -659,7 +660,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
 			backend.setCurrentKey(1);
 			assertEquals("1,u1", restored2.get());
@@ -698,7 +699,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
 			// some modifications to the state
 			backend.setCurrentKey(1);
@@ -746,7 +747,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
 			backend.setCurrentKey(1);
 			assertEquals("Fold-Initial:,1", restored1.get());
@@ -763,7 +764,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			@SuppressWarnings("unchecked")
 			FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")
-			KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
 			backend.setCurrentKey(1);
 			assertEquals("Fold-Initial:,101", restored2.get());
@@ -1254,7 +1255,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1280,7 +1281,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1311,7 +1312,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1342,7 +1343,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1451,7 +1452,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	 * if it is not null.
 	 */
 	private static <V, K, N> V getSerializedValue(
-			KvState<N> kvState,
+			InternalKvState<N> kvState,
 			K key,
 			TypeSerializer<K> keySerializer,
 			N namespace,
@@ -1475,7 +1476,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	 * if it is not null.
 	 */
 	private static <V, K, N> List<V> getSerializedList(
-			KvState<N> kvState,
+			InternalKvState<N> kvState,
 			K key,
 			TypeSerializer<K> keySerializer,
 			N namespace,

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
new file mode 100644
index 0000000..33d60a0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link ListState}.
+ */
+public class HeapListStateTest {
+
+	@Test
+	public void testAddAndGet() throws Exception {
+
+		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+		try {
+			InternalListState<VoidNamespace, Long> state =
+					keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(asList(17L, 11L), state.get());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(asList(17L, 11L), state.get());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+
+			StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
+					((HeapListState<String, VoidNamespace, Long>) state).stateTable;
+
+			assertTrue(stateTable.isEmpty());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testMerging() throws Exception {
+
+		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L));
+
+		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+		try {
+			InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			validateResult(state.get(), expectedResult);
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			validateResult(state.get(), expectedResult);
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			validateResult(state.get(), expectedResult);
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			validateResult(state.get(), expectedResult);
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			StateTable<String, Integer, ArrayList<Long>> stateTable = 
+					((HeapListState<String, Integer, Long>) state).stateTable;
+
+			assertTrue(stateTable.isEmpty());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+		return new HeapKeyedStateBackend<>(
+				mock(TaskKvStateRegistry.class),
+				StringSerializer.INSTANCE,
+				HeapListStateTest.class.getClassLoader(),
+				16,
+				new KeyGroupRange(0, 15));
+	}
+	
+	private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
+		int num = 0;
+		for (T v : values) {
+			num++;
+			assertTrue(expected.contains(v));
+		}
+
+		assertEquals(expected.size(), num);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
new file mode 100644
index 0000000..e0929f1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link ReducingState}.
+ */
+public class HeapReducingStateTest {
+
+	@Test
+	public void testAddAndGet() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr =
+				new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+		try {
+			InternalReducingState<VoidNamespace, Long> state =
+					keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+
+			StateTable<String, VoidNamespace, Long> stateTable =
+					((HeapReducingState<String, VoidNamespace, Long>) state).stateTable;
+
+			assertTrue(stateTable.isEmpty());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testMerging() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
+				"my-state", new AddingFunction(), Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Long expectedResult = 165L;
+
+		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+		try {
+			final InternalReducingState<Integer, Long> state =
+					keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+			
+			StateTable<String, Integer, Long> stateTable =
+					((HeapReducingState<String, Integer, Long>) state).stateTable;
+
+			assertTrue(stateTable.isEmpty());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+		return new HeapKeyedStateBackend<>(
+				mock(TaskKvStateRegistry.class),
+				StringSerializer.INSTANCE,
+				HeapReducingStateTest.class.getClassLoader(),
+				16,
+				new KeyGroupRange(0, 15));
+	}
+
+	// ------------------------------------------------------------------------
+	//  test functions
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static class AddingFunction implements ReduceFunction<Long> {
+
+		@Override
+		public Long reduce(Long a, Long b)  {
+			return a + b;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a21660c..6bb0a40 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -494,15 +494,34 @@ public abstract class AbstractStreamOperator<OUT>
 		return getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
 	}
 
+
+	protected <N, S extends State, T> S getOrCreateKeyedState(
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor) throws Exception {
+
+		if (keyedStateStore != null) {
+			return keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+		}
+		else {
+			throw new IllegalStateException("Cannot create partitioned state. " +
+					"The keyed state backend has not been set." +
+					"This indicates that the operator is not partitioned/keyed.");
+		}
+	}
+
 	/**
 	 * Creates a partitioned state handle, using the state backend configured for this task.
+	 * 
+	 * TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
+	 *       This method should be removed for the sake of namespaces being lazily fetched from the keyed
+	 *       state backend, or being set on the state directly.
 	 *
 	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
-	@SuppressWarnings("unchecked")
 	protected <S extends State, N> S getPartitionedState(
-			N namespace, TypeSerializer<N> namespaceSerializer,
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
 			StateDescriptor<S, ?> stateDescriptor) throws Exception {
 
 		if (keyedStateStore != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 0d5d091..05c89dd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -35,7 +35,7 @@ import java.util.Set;
 
 /**
  * A {@link Window} that represents a time interval from {@code start} (inclusive) to
- * {@code start + size} (exclusive).
+ * {@code end} (exclusive).
  */
 @PublicEvolving
 public class TimeWindow extends Window {
@@ -48,14 +48,35 @@ public class TimeWindow extends Window {
 		this.end = end;
 	}
 
+	/**
+	 * Gets the starting timestamp of the window. This is the first timestamp that belongs
+	 * to this window.
+	 * 
+	 * @return The starting timestamp of this window.
+	 */
 	public long getStart() {
 		return start;
 	}
 
+	/**
+	 * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it
+	 * is the first timestamp that does not belong to this window any more.
+	 * 
+	 * @return The exclusive end timestamp of this window.
+	 */
 	public long getEnd() {
 		return end;
 	}
 
+	/**
+	 * Gets the largest timestamp that still belongs to this window.
+	 * 
+	 * <p>This timestamp is identical to {@code getEnd() - 1}.
+	 * 
+	 * @return The largest timestamp that still belongs to this window.
+	 * 
+	 * @see #getEnd() 
+	 */
 	@Override
 	public long maxTimestamp() {
 		return end - 1;
@@ -104,6 +125,13 @@ public class TimeWindow extends Window {
 		return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
 	}
 
+	// ------------------------------------------------------------------------
+	// Serializer
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The serializer used to write the TimeWindow type.
+	 */
 	public static class Serializer extends TypeSerializer<TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
@@ -152,9 +180,7 @@ public class TimeWindow extends Window {
 
 		@Override
 		public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
-			long start = source.readLong();
-			long end = source.readLong();
-			return new TimeWindow(start, end);
+			return deserialize(source);
 		}
 
 		@Override
@@ -179,6 +205,10 @@ public class TimeWindow extends Window {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Merge overlapping {@link TimeWindow}s. For use by merging
 	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
index 0e131ff..b17989c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -31,5 +31,10 @@ import org.apache.flink.annotation.PublicEvolving;
 @PublicEvolving
 public abstract class Window {
 
+	/**
+	 * Gets the largest timestamp that still belongs to this window.
+	 *
+	 * @return The largest timestamp that still belongs to this window.
+	 */
 	public abstract long maxTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 8c73878..d9c977a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -20,15 +20,16 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -41,7 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
@@ -56,40 +57,52 @@ import static java.util.Objects.requireNonNull;
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
  */
 @Internal
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
+public class EvictingWindowOperator<K, IN, OUT, W extends Window> 
+		extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
 
 	private static final long serialVersionUID = 1L;
 
+	// ------------------------------------------------------------------------
+	// these fields are set by the API stream graph builder to configure the operator 
+	
 	private final Evictor<? super IN, ? super W> evictor;
 
+	private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor;
+
+	// ------------------------------------------------------------------------
+	// the fields below are instantiated once the operator runs in the runtime 
+
 	private transient EvictorContext evictorContext;
 
-	private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;
+	private transient InternalListState<W, StreamRecord<IN>> evictingWindowState;
+
+	// ------------------------------------------------------------------------
 
 	public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-		TypeSerializer<W> windowSerializer,
-		KeySelector<IN, K> keySelector,
-		TypeSerializer<K> keySerializer,
-		StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
-		InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
-		Trigger<? super IN, ? super W> trigger,
-		Evictor<? super IN, ? super W> evictor,
-		long allowedLateness) {
+			TypeSerializer<W> windowSerializer,
+			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
+			StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
+			InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger,
+			Evictor<? super IN, ? super W> evictor,
+			long allowedLateness) {
 
 		super(windowAssigner, windowSerializer, keySelector,
 			keySerializer, null, windowFunction, trigger, allowedLateness);
-		this.evictor = requireNonNull(evictor);
-		this.windowStateDescriptor = windowStateDescriptor;
+
+		this.evictor = checkNotNull(evictor);
+		this.evictingWindowStateDescriptor = checkNotNull(windowStateDescriptor);
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Collection<W> elementWindows = windowAssigner.assignWindows(
 				element.getValue(),
 				element.getTimestamp(),
 				windowAssignerContext);
 
+		@SuppressWarnings("unchecked")
 		final K key = (K) getKeyedStateBackend().getCurrentKey();
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
@@ -119,11 +132,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 								}
 
 								// merge the merged state windows into the newly resulting state window
-								getKeyedStateBackend().mergePartitionedStates(
-									stateWindowResult,
-									mergedStateWindows,
-									windowSerializer,
-									(StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
+								evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows);
 							}
 						});
 
@@ -137,9 +146,9 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				if (stateWindow == null) {
 					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
 				}
-				ListState<StreamRecord<IN>> windowState = getPartitionedState(
-					stateWindow, windowSerializer, windowStateDescriptor);
-				windowState.add(element);
+				
+				evictingWindowState.setCurrentNamespace(stateWindow);
+				evictingWindowState.add(element);
 
 				context.key = key;
 				context.window = actualWindow;
@@ -149,16 +158,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				TriggerResult triggerResult = context.onElement(element);
 
 				if (triggerResult.isFire()) {
-					Iterable<StreamRecord<IN>> contents = windowState.get();
+					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 					if (contents == null) {
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(actualWindow, contents, windowState);
+					fire(actualWindow, contents, evictingWindowState);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(actualWindow, windowState, mergingWindows);
+					cleanup(actualWindow, evictingWindowState, mergingWindows);
 				} else {
 					registerCleanupTimer(actualWindow);
 				}
@@ -173,9 +182,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 					continue;
 				}
 
-				ListState<StreamRecord<IN>> windowState = getPartitionedState(
-					window, windowSerializer, windowStateDescriptor);
-				windowState.add(element);
+				evictingWindowState.setCurrentNamespace(window);
+				evictingWindowState.add(element);
 
 				context.key = key;
 				context.window = window;
@@ -185,16 +193,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				TriggerResult triggerResult = context.onElement(element);
 
 				if (triggerResult.isFire()) {
-					Iterable<StreamRecord<IN>> contents = windowState.get();
+					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 					if (contents == null) {
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(window, contents, windowState);
+					fire(window, contents, evictingWindowState);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(window, windowState, null);
+					cleanup(window, evictingWindowState, null);
 				} else {
 					registerCleanupTimer(window);
 				}
@@ -222,15 +230,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				// so it is safe to just ignore
 				return;
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+			
+			evictingWindowState.setCurrentNamespace(stateWindow);
 		} else {
-			windowState = getPartitionedState(
-					context.window,
-					windowSerializer,
-					windowStateDescriptor);
+			evictingWindowState.setCurrentNamespace(context.window);
 		}
 
-		Iterable<StreamRecord<IN>> contents = windowState.get();
+		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 		if (contents == null) {
 			// if we have no state, there is nothing to do
 			return;
@@ -238,11 +244,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
 		if (triggerResult.isFire()) {
-			fire(context.window, contents, windowState);
+			fire(context.window, contents, evictingWindowState);
 		}
 
 		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+			cleanup(context.window, evictingWindowState, mergingWindows);
 		}
 	}
 
@@ -265,12 +271,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				// so it is safe to just ignore
 				return;
 			}
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+			evictingWindowState.setCurrentNamespace(stateWindow);
 		} else {
-			windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+			evictingWindowState.setCurrentNamespace(context.window);
 		}
 
-		Iterable<StreamRecord<IN>> contents = windowState.get();
+		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 		if (contents == null) {
 			// if we have no state, there is nothing to do
 			return;
@@ -278,11 +284,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
 		if (triggerResult.isFire()) {
-			fire(context.window, contents, windowState);
+			fire(context.window, contents, evictingWindowState);
 		}
 
 		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+			cleanup(context.window, evictingWindowState, mergingWindows);
 		}
 	}
 
@@ -381,7 +387,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	@Override
 	public void open() throws Exception {
 		super.open();
+
 		evictorContext = new EvictorContext(null,null);
+		evictingWindowState = (InternalListState<W, StreamRecord<IN>>) 
+				getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);
 	}
 
 	@Override
@@ -409,6 +418,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	@VisibleForTesting
 	@SuppressWarnings("unchecked, rawtypes")
 	public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
-		return (StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?>) windowStateDescriptor;
+		return (StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?>) evictingWindowStateDescriptor;
 	}
 }