You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:14 UTC

[7/9] flink git commit: [FLINK-5590] [runtime] Add proper internal state hierarchy

[FLINK-5590] [runtime] Add proper internal state hierarchy

This introduces an internal state hierarchy that mirrors the external state hierarchy,
but gives the runtime access to methods that should not be part of the user facing API,
such as:
  - setting namespaces
  - accessing raw values
  - merging namespaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b97128f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b97128f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b97128f

Branch: refs/heads/master
Commit: 3b97128f05bacfb80afe4a2a49741c31ff306cd2
Parents: 2f1c474
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 13 15:17:09 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:53:39 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |   9 +-
 .../streaming/state/RocksDBFoldingState.java    |   3 +-
 .../state/RocksDBKeyedStateBackend.java         |  20 +-
 .../streaming/state/RocksDBListState.java       |  42 ++-
 .../streaming/state/RocksDBReducingState.java   |  74 +++++-
 .../streaming/state/RocksDBValueState.java      |   3 +-
 .../streaming/state/RocksDBListStateTest.java   | 232 +++++++++++++++++
 .../state/RocksDBReducingStateTest.java         | 235 +++++++++++++++++
 .../flink/runtime/execution/Environment.java    |   4 +-
 .../runtime/io/network/NetworkEnvironment.java  |   5 +-
 .../runtime/jobmaster/JobMasterGateway.java     |   6 +-
 .../apache/flink/runtime/query/KvStateID.java   |   4 +-
 .../flink/runtime/query/KvStateLocation.java    |   4 +-
 .../runtime/query/KvStateLocationRegistry.java  |   4 +-
 .../flink/runtime/query/KvStateMessage.java     |  10 +-
 .../flink/runtime/query/KvStateRegistry.java    |  10 +-
 .../runtime/query/TaskKvStateRegistry.java      |   4 +-
 .../query/netty/KvStateServerHandler.java       |  12 +-
 .../query/netty/message/KvStateRequest.java     |   4 +-
 .../state/AbstractKeyedStateBackend.java        | 182 +++++--------
 .../flink/runtime/state/KeyedStateBackend.java  |  35 ++-
 .../org/apache/flink/runtime/state/KvState.java |  52 ----
 .../flink/runtime/state/VoidNamespace.java      |  43 +++-
 .../state/heap/AbstractHeapMergingState.java    | 123 +++++++++
 .../runtime/state/heap/AbstractHeapState.java   |   4 +-
 .../runtime/state/heap/HeapFoldingState.java    |   8 +-
 .../state/heap/HeapKeyedStateBackend.java       |  35 ++-
 .../flink/runtime/state/heap/HeapListState.java |  19 +-
 .../runtime/state/heap/HeapReducingState.java   |  23 +-
 .../runtime/state/heap/HeapValueState.java      |   3 +-
 .../flink/runtime/state/heap/StateTable.java    |  18 ++
 .../state/internal/InternalAppendingState.java  |  32 +++
 .../state/internal/InternalFoldingState.java    |  32 +++
 .../runtime/state/internal/InternalKvState.java |  79 ++++++
 .../state/internal/InternalListState.java       |  31 +++
 .../state/internal/InternalMergingState.java    |  46 ++++
 .../state/internal/InternalReducingState.java   |  31 +++
 .../state/internal/InternalValueState.java      |  31 +++
 .../runtime/query/netty/KvStateClientTest.java  |   4 +-
 .../query/netty/KvStateServerHandlerTest.java   |   6 +-
 .../message/KvStateRequestSerializerTest.java   |  30 +--
 .../runtime/state/StateBackendTestBase.java     |  43 ++--
 .../runtime/state/heap/HeapListStateTest.java   | 248 ++++++++++++++++++
 .../state/heap/HeapReducingStateTest.java       | 254 +++++++++++++++++++
 .../api/operators/AbstractStreamOperator.java   |  23 +-
 .../api/windowing/windows/TimeWindow.java       |  38 ++-
 .../streaming/api/windowing/windows/Window.java |   5 +
 .../windowing/EvictingWindowOperator.java       | 101 ++++----
 .../operators/windowing/WindowOperator.java     | 149 ++++++-----
 .../operators/windowing/WindowOperatorTest.java |  57 ++---
 .../KVStateRequestSerializerRocksDBTest.java    |  35 ++-
 51 files changed, 2054 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 6785f17..89f41aa 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -25,14 +25,13 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -48,9 +47,7 @@ import java.io.IOException;
  * @param <SD> The type of {@link StateDescriptor}.
  */
 public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
-		implements KvState<N>, State {
-
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
+		implements InternalKvState<N>, State {
 
 	/** Serializer for the namespace */
 	private final TypeSerializer<N> namespaceSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 9c2bf4f..26dc3dd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -41,7 +42,7 @@ import java.io.IOException;
  */
 public class RocksDBFoldingState<K, N, T, ACC>
 	extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
-	implements FoldingState<T, ACC> {
+	implements InternalFoldingState<N, T, ACC> {
 
 	/** Serializer for the values */
 	private final TypeSerializer<ACC> valueSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index b207af6..5a6d102 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -18,14 +18,10 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
@@ -52,6 +48,10 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.InstantiationUtil;
@@ -812,7 +812,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
+	protected <N, T> InternalValueState<N, T> createValueState(
+			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<T> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
@@ -821,7 +822,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
+	protected <N, T> InternalListState<N, T> createListState(
+			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
@@ -830,7 +832,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
+	protected <N, T> InternalReducingState<N, T> createReducingState(
+			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<T> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
@@ -839,7 +842,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
+	protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index beea81a..e6988f7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -30,6 +32,7 @@ import org.rocksdb.WriteOptions;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -45,7 +48,7 @@ import java.util.List;
  */
 public class RocksDBListState<K, N, V>
 	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, V>
-	implements ListState<V> {
+	implements InternalListState<N, V> {
 
 	/** Serializer for the values */
 	private final TypeSerializer<V> valueSerializer;
@@ -117,4 +120,41 @@ public class RocksDBListState<K, N, V>
 		}
 	}
 
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		if (sources == null || sources.isEmpty()) {
+			return;
+		}
+
+		// cache key and namespace
+		final K key = backend.getCurrentKey();
+		final int keyGroup = backend.getCurrentKeyGroupIndex();
+
+		try {
+			// create the target full-binary-key 
+			writeKeyWithGroupAndNamespace(
+					keyGroup, key, target,
+					keySerializationStream, keySerializationDataOutputView);
+			final byte[] targetKey = keySerializationStream.toByteArray();
+
+			// merge the sources to the target
+			for (N source : sources) {
+				if (source != null) {
+					writeKeyWithGroupAndNamespace(
+							keyGroup, key, source,
+							keySerializationStream, keySerializationDataOutputView);
+
+					byte[] sourceKey = keySerializationStream.toByteArray();
+					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+
+					if (valueBytes != null) {
+						backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes);
+					}
+				}
+			}
+		}
+		catch (Exception e) {
+			throw new Exception("Error while merging state in RocksDB", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 068c051..ccc98a7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -22,14 +22,18 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.util.Collection;
 
 /**
  * {@link ReducingState} implementation that stores state in RocksDB.
@@ -40,7 +44,7 @@ import java.io.IOException;
  */
 public class RocksDBReducingState<K, N, V>
 	extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V>
-	implements ReducingState<V> {
+	implements InternalReducingState<N, V> {
 
 	/** Serializer for the values */
 	private final TypeSerializer<V> valueSerializer;
@@ -113,4 +117,72 @@ public class RocksDBReducingState<K, N, V>
 		}
 	}
 
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		if (sources == null || sources.isEmpty()) {
+			return;
+		}
+
+		// cache key and namespace
+		final K key = backend.getCurrentKey();
+		final int keyGroup = backend.getCurrentKeyGroupIndex();
+
+		try {
+			V current = null;
+
+			// merge the sources to the target
+			for (N source : sources) {
+				if (source != null) {
+
+					writeKeyWithGroupAndNamespace(
+							keyGroup, key, source,
+							keySerializationStream, keySerializationDataOutputView);
+
+					final byte[] sourceKey = keySerializationStream.toByteArray();
+					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+
+					if (valueBytes != null) {
+						V value = valueSerializer.deserialize(
+								new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+						if (current != null) {
+							current = reduceFunction.reduce(current, value);
+						}
+						else {
+							current = value;
+						}
+					}
+				}
+			}
+
+			// if something came out of merging the sources, merge it or write it to the target
+			if (current != null) {
+				// create the target full-binary-key 
+				writeKeyWithGroupAndNamespace(
+						keyGroup, key, target,
+						keySerializationStream, keySerializationDataOutputView);
+
+				final byte[] targetKey = keySerializationStream.toByteArray();
+				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
+
+				if (targetValueBytes != null) {
+					// target also had a value, merge
+					V value = valueSerializer.deserialize(
+							new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+
+					current = reduceFunction.reduce(current, value);
+				}
+
+				// serialize the resulting value
+				keySerializationStream.reset();
+				valueSerializer.serialize(current, keySerializationDataOutputView);
+
+				// write the resulting value
+				backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+			}
+		}
+		catch (Exception e) {
+			throw new Exception("Error while merging state in RocksDB", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 9563ed8..7724f02 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -40,7 +41,7 @@ import java.io.IOException;
  */
 public class RocksDBValueState<K, N, V>
 	extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, V>
-	implements ValueState<V> {
+	implements InternalValueState<N, V> {
 
 	/** Serializer for the values */
 	private final TypeSerializer<V> valueSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
new file mode 100644
index 0000000..d8d0308
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link ListState} implementation on top of RocksDB.
+ */
+public class RocksDBListStateTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testAddAndGet() throws Exception {
+
+		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+		
+		try {
+			InternalListState<VoidNamespace, Long> state = 
+					keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(asList(17L, 11L), state.get());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(asList(17L, 11L), state.get());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testMerging() throws Exception {
+
+		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final TimeWindow win1 = new TimeWindow(1000, 2000);
+		final TimeWindow win2 = new TimeWindow(2000, 3000);
+		final TimeWindow win3 = new TimeWindow(3000, 4000);
+
+		final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L));
+
+		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+
+		try {
+			InternalListState<TimeWindow, Long> state = keyedBackend.createListState(new TimeWindow.Serializer(), stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(win1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(win2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(win3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(win1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(win3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(win1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(win3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			validateResult(state.get(), expectedResult);
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			validateResult(state.get(), expectedResult);
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			validateResult(state.get(), expectedResult);
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			validateResult(state.get(), expectedResult);
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
+		return (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
+				new DummyEnvironment("TestTask", 1, 0),
+				new JobID(),
+				"test-op",
+				StringSerializer.INSTANCE,
+				16,
+				new KeyGroupRange(2, 3),
+				mock(TaskKvStateRegistry.class));
+	}
+
+	private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
+		int num = 0;
+		for (T v : values) {
+			num++;
+			assertTrue(expected.contains(v));
+		}
+
+		assertEquals(expected.size(), num);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
new file mode 100644
index 0000000..fb854f2
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link ReducingState} implementation on top of RocksDB.
+ */
+public class RocksDBReducingStateTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testAddAndGet() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr = 
+				new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+		
+		try {
+			InternalReducingState<VoidNamespace, Long> state = 
+					keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testMerging() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
+				"my-state", new AddingFunction(), Long.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final TimeWindow win1 = new TimeWindow(1000, 2000);
+		final TimeWindow win2 = new TimeWindow(2000, 3000);
+		final TimeWindow win3 = new TimeWindow(3000, 4000);
+
+		final Long expectedResult = 165L;
+
+		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+
+		try {
+			final InternalReducingState<TimeWindow, Long> state = 
+					keyedBackend.createReducingState(new TimeWindow.Serializer(), stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(win1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(win2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(win3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(win1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(win3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(win1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(win3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
+		return (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
+				new DummyEnvironment("TestTask", 1, 0),
+				new JobID(),
+				"test-op",
+				StringSerializer.INSTANCE,
+				16,
+				new KeyGroupRange(2, 3),
+				mock(TaskKvStateRegistry.class));
+	}
+
+	// ------------------------------------------------------------------------
+	//  test functions
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static class AddingFunction implements ReduceFunction<Long> {
+
+		@Override
+		public Long reduce(Long a, Long b)  {
+			return a + b;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 8874eca..1675365 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import java.util.Map;
@@ -151,7 +151,7 @@ public interface Environment {
 	AccumulatorRegistry getAccumulatorRegistry();
 
 	/**
-	 * Returns the registry for {@link KvState} instances.
+	 * Returns the registry for {@link InternalKvState} instances.
 	 *
 	 * @return KvState registry
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 3aba6dc..5cf2c26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.Preconditions;
@@ -60,10 +61,10 @@ public class NetworkEnvironment {
 
 	private final TaskEventDispatcher taskEventDispatcher;
 
-	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
+	/** Server for {@link InternalKvState} requests. */
 	private final KvStateServer kvStateServer;
 
-	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
+	/** Registry for {@link InternalKvState} instances. */
 	private final KvStateRegistry kvStateRegistry;
 
 	private final IOManager.IOMode defaultIOMode;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 5ab68fe..de7646b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -38,8 +38,8 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -140,10 +140,10 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 		final Exception cause);
 
 	/**
-	 * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
+	 * Requests a {@link KvStateLocation} for the specified {@link InternalKvState} registration name.
 	 *
 	 * @param registrationName Name under which the KvState has been registered.
-	 * @return Future of the requested {@link KvState} location
+	 * @return Future of the requested {@link InternalKvState} location
 	 */
 	Future<KvStateLocation> lookupKvStateLocation(final String registrationName);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
index bb05833..c122508 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.query;
 
-import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.AbstractID;
 
 /**
- * Identifier for {@link KvState} instances.
+ * Identifier for {@link InternalKvState} instances.
  *
  * <p>Assigned when registering state at the {@link KvStateRegistry}.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 90bb2a5..f074e8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -21,14 +21,14 @@ package org.apache.flink.runtime.query;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Arrays;
 
 /**
- * Location information for all key groups of a {@link KvState} instance.
+ * Location information for all key groups of a {@link InternalKvState} instance.
  *
  * <p>This is populated by the {@link KvStateLocationRegistry} and used by the
  * {@link QueryableStateClient} to target queries.

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
index c489025..cb61905 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
@@ -23,14 +23,14 @@ import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Simple registry, which maps {@link KvState} registration notifications to
+ * Simple registry, which maps {@link InternalKvState} registration notifications to
  * {@link KvStateLocation} instances.
  */
 public class KvStateLocationRegistry {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
index 9ac2d44..6808c5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 
 /**
- * Actor messages for {@link KvState} lookup and registration.
+ * Actor messages for {@link InternalKvState} lookup and registration.
  */
 public interface KvStateMessage extends Serializable {
 
@@ -47,7 +47,7 @@ public interface KvStateMessage extends Serializable {
 
 		/**
 		 * Requests a {@link KvStateLocation} for the specified JobID and
-		 * {@link KvState} registration name.
+		 * {@link InternalKvState} registration name.
 		 *
 		 * @param jobId            JobID the KvState instance belongs to
 		 * @param registrationName Name under which the KvState has been registered
@@ -111,7 +111,7 @@ public interface KvStateMessage extends Serializable {
 		private final KvStateServerAddress kvStateServerAddress;
 
 		/**
-		 * Notifies the JobManager about a registered {@link KvState} instance.
+		 * Notifies the JobManager about a registered {@link InternalKvState} instance.
 		 *
 		 * @param jobId                JobID the KvState instance belongs to
 		 * @param jobVertexId          JobVertexID the KvState instance belongs to
@@ -221,7 +221,7 @@ public interface KvStateMessage extends Serializable {
 		private final String registrationName;
 
 		/**
-		 * Notifies the JobManager about an unregistered {@link KvState} instance.
+		 * Notifies the JobManager about an unregistered {@link InternalKvState} instance.
 		 *
 		 * @param jobId                JobID the KvState instance belongs to
 		 * @param jobVertexId          JobVertexID the KvState instance belongs to

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index f57ae47..aa698e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -21,15 +21,15 @@ package org.apache.flink.runtime.query;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A registry for {@link KvState} instances per task manager.
+ * A registry for {@link InternalKvState} instances per task manager.
  *
  * <p>This is currently only used for KvState queries: KvState instances, which
  * are marked as queryable in their state descriptor are registered here and
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class KvStateRegistry {
 
 	/** All registered KvState instances. */
-	private final ConcurrentHashMap<KvStateID, KvState<?>> registeredKvStates =
+	private final ConcurrentHashMap<KvStateID, InternalKvState<?>> registeredKvStates =
 			new ConcurrentHashMap<>();
 
 	/** Registry listener to be notified on registration/unregistration. */
@@ -91,7 +91,7 @@ public class KvStateRegistry {
 			JobVertexID jobVertexId,
 			KeyGroupRange keyGroupRange,
 			String registrationName,
-			KvState<?> kvState) {
+			InternalKvState<?> kvState) {
 
 		KvStateID kvStateId = new KvStateID();
 
@@ -145,7 +145,7 @@ public class KvStateRegistry {
 	 * @param kvStateId KvStateID to identify the KvState instance
 	 * @return KvState instance identified by the KvStateID or <code>null</code>
 	 */
-	public KvState<?> getKvState(KvStateID kvStateId) {
+	public InternalKvState<?> getKvState(KvStateID kvStateId) {
 		return registeredKvStates.get(kvStateId);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index d831214..e3cf151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -59,7 +59,7 @@ public class TaskKvStateRegistry {
 	 *                         descriptor used to create the KvState instance)
 	 * @param kvState          The
 	 */
-	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, KvState<?> kvState) {
+	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?> kvState) {
 		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
 		registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
index 34cf15f..92e7658 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.message.KvStateRequest;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
-import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +40,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This handler dispatches asynchronous tasks, which query {@link KvState}
+ * This handler dispatches asynchronous tasks, which query {@link InternalKvState}
  * instances and write the result to the channel.
  *
  * <p>The network threads receive the message, deserialize it and dispatch the
@@ -104,7 +104,7 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 
 				stats.reportRequest();
 
-				KvState<?> kvState = registry.getKvState(request.getKvStateId());
+				InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
 
 				if (kvState != null) {
 					// Execute actual query async, because it is possibly
@@ -179,7 +179,7 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 	}
 
 	/**
-	 * Task to execute the actual query against the {@link KvState} instance.
+	 * Task to execute the actual query against the {@link InternalKvState} instance.
 	 */
 	private static class AsyncKvStateQueryTask implements Runnable {
 
@@ -187,7 +187,7 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 
 		private final KvStateRequest request;
 
-		private final KvState<?> kvState;
+		private final InternalKvState<?> kvState;
 
 		private final KvStateRequestStats stats;
 
@@ -196,7 +196,7 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 		public AsyncKvStateQueryTask(
 				ChannelHandlerContext ctx,
 				KvStateRequest request,
-				KvState<?> kvState,
+				InternalKvState<?> kvState,
 				KvStateRequestStats stats) {
 
 			this.ctx = Objects.requireNonNull(ctx, "Channel handler context");

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
index 0abb653..4b73fbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.query.netty.message;
 
 import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
 /**
- * A {@link KvState} instance request for a specific key and namespace.
+ * A {@link InternalKvState} instance request for a specific key and namespace.
  */
 public final class KvStateRequest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index cab2b4f..fcca77a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -19,12 +19,10 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
@@ -35,14 +33,18 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base implementation of KeyedStateBackend. The state can be checkpointed
@@ -63,13 +65,13 @@ public abstract class AbstractKeyedStateBackend<K>
 	private int currentKeyGroup;
 
 	/** So that we can give out state when the user uses the same key. */
-	protected HashMap<String, KvState<?>> keyValueStatesByName;
+	protected final HashMap<String, InternalKvState<?>> keyValueStatesByName;
 
 	/** For caching the last accessed partitioned state */
 	private String lastName;
 
 	@SuppressWarnings("rawtypes")
-	private KvState lastState;
+	private InternalKvState lastState;
 
 	/** The number of key-groups aka max parallelism */
 	protected final int numberOfKeyGroups;
@@ -98,6 +100,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.cancelStreamRegistry = new CloseableRegistry();
+		this.keyValueStatesByName = new HashMap<>();
 	}
 
 	/**
@@ -113,7 +116,7 @@ public abstract class AbstractKeyedStateBackend<K>
 
 		lastName = null;
 		lastState = null;
-		keyValueStatesByName = null;
+		keyValueStatesByName.clear();
 	}
 
 	/**
@@ -125,7 +128,9 @@ public abstract class AbstractKeyedStateBackend<K>
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the value that the {@code ValueState} can store.
 	 */
-	protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception;
+	protected abstract <N, T> InternalValueState<N, T> createValueState(
+			TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<T> stateDesc) throws Exception;
 
 	/**
 	 * Creates and returns a new {@link ListState}.
@@ -136,7 +141,9 @@ public abstract class AbstractKeyedStateBackend<K>
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception;
+	protected abstract <N, T> InternalListState<N, T> createListState(
+			TypeSerializer<N> namespaceSerializer,
+			ListStateDescriptor<T> stateDesc) throws Exception;
 
 	/**
 	 * Creates and returns a new {@link ReducingState}.
@@ -147,7 +154,9 @@ public abstract class AbstractKeyedStateBackend<K>
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception;
+	protected abstract <N, T> InternalReducingState<N, T> createReducingState(
+			TypeSerializer<N> namespaceSerializer,
+			ReducingStateDescriptor<T> stateDesc) throws Exception;
 
 	/**
 	 * Creates and returns a new {@link FoldingState}.
@@ -159,7 +168,9 @@ public abstract class AbstractKeyedStateBackend<K>
 	 * @param <T> Type of the values folded into the state
 	 * @param <ACC> Type of the value in the state	 *
 	 */
-	protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+	protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+			TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 
 	/**
 	 * @see KeyedStateBackend
@@ -213,35 +224,27 @@ public abstract class AbstractKeyedStateBackend<K>
 	 * @see KeyedStateBackend
 	 */
 	@Override
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
-		Preconditions.checkNotNull(namespace, "Namespace");
-		Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
+	public <N, S extends State, V> S getOrCreateKeyedState(
+			final TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, V> stateDescriptor) throws Exception {
+
+		checkNotNull(namespaceSerializer, "Namespace serializer");
 
 		if (keySerializer == null) {
-			throw new RuntimeException("State key serializer has not been configured in the config. " +
+			throw new UnsupportedOperationException(
+					"State key serializer has not been configured in the config. " +
 					"This operation cannot use partitioned state.");
 		}
-		
-		if (!stateDescriptor.isSerializerInitialized()) {
-			stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
-		}
 
-		if (keyValueStatesByName == null) {
-			keyValueStatesByName = new HashMap<>();
+		if (!stateDescriptor.isSerializerInitialized()) {
+			throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); 
 		}
 
-		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
-			lastState.setCurrentNamespace(namespace);
-			return (S) lastState;
-		}
-
-		KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
-		if (previous != null) {
-			lastState = previous;
-			lastState.setCurrentNamespace(namespace);
-			lastName = stateDescriptor.getName();
-			return (S) previous;
+		InternalKvState<?> existing = keyValueStatesByName.get(stateDescriptor.getName());
+		if (existing != null) {
+			@SuppressWarnings("unchecked")
+			S typedState = (S) existing;
+			return typedState;
 		}
 
 		// create a new blank key/value state
@@ -268,15 +271,10 @@ public abstract class AbstractKeyedStateBackend<K>
 
 		});
 
-		KvState kvState = (KvState) state;
-
+		@SuppressWarnings("unchecked")
+		InternalKvState<N> kvState = (InternalKvState<N>) state;
 		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
 
-		lastName = stateDescriptor.getName();
-		lastState = kvState;
-
-		kvState.setCurrentNamespace(namespace);
-
 		// Publish queryable state
 		if (stateDescriptor.isQueryable()) {
 			if (kvStateRegistry == null) {
@@ -290,90 +288,48 @@ public abstract class AbstractKeyedStateBackend<K>
 		return state;
 	}
 
+	/**
+	 * TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
+	 *       This method should be removed for the sake of namespaces being lazily fetched from the keyed
+	 *       state backend, or being set on the state directly.
+	 * 
+	 * @see KeyedStateBackend
+	 */
+	@SuppressWarnings("unchecked")
 	@Override
-	public <N, S extends MergingState<?, ?>> void mergePartitionedStates(
-			final N target,
-			Collection<N> sources,
+	public <N, S extends State> S getPartitionedState(
+			final N namespace,
 			final TypeSerializer<N> namespaceSerializer,
 			final StateDescriptor<S, ?> stateDescriptor) throws Exception {
 
-		if (stateDescriptor instanceof ReducingStateDescriptor) {
-			mergeReducingState((ReducingStateDescriptor<?>) stateDescriptor, namespaceSerializer, target,sources);
-		}
-		else if (stateDescriptor instanceof ListStateDescriptor) {
-			mergeListState((ListStateDescriptor<?>) stateDescriptor, namespaceSerializer, target,sources);
-		}
-		else {
-			throw new IllegalArgumentException("Cannot merge states for " + stateDescriptor);
-		}
-	}
+		checkNotNull(namespace, "Namespace");
 
-	private <N, T> void mergeReducingState(
-			final ReducingStateDescriptor<?> stateDescriptor,
-			final TypeSerializer<N> namespaceSerializer,
-			final N target,
-			final Collection<N> sources) throws Exception {
-
-		@SuppressWarnings("unchecked")
-		final ReducingStateDescriptor<T> reducingStateDescriptor = (ReducingStateDescriptor<T>) stateDescriptor;
-
-		@SuppressWarnings("unchecked")
-		final ReducingState<T> state = (ReducingState<T>) getPartitionedState(target, namespaceSerializer, stateDescriptor);
-
-		@SuppressWarnings("unchecked")
-		final KvState<N> kvState = (KvState<N>) state;
-
-		final ReduceFunction<T> reduceFn = reducingStateDescriptor.getReduceFunction();
-
-		T result = null;
-		for (N source: sources) {
-			kvState.setCurrentNamespace(source);
-			T sourceValue = state.get();
-			if (result == null) {
-				result = state.get();
-			} else if (sourceValue != null) {
-				result = reduceFn.reduce(result, sourceValue);
-			}
-			state.clear();
+		// TODO: This is wrong, it should throw an exception that the initialization has not properly happened
+		if (!stateDescriptor.isSerializerInitialized()) {
+			stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
 		}
 
-		// write result to the target
-		kvState.setCurrentNamespace(target);
-		if (result != null) {
-			state.add(result);
+		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
+			lastState.setCurrentNamespace(namespace);
+			return (S) lastState;
 		}
-	}
 
-	private <N, T> void mergeListState(
-			final ListStateDescriptor<?> listStateDescriptor,
-			final TypeSerializer<N> namespaceSerializer,
-			final N target,
-			final Collection<N> sources) throws Exception {
+		InternalKvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
+		if (previous != null) {
+			lastState = previous;
+			lastState.setCurrentNamespace(namespace);
+			lastName = stateDescriptor.getName();
+			return (S) previous;
+		}
 
-		@SuppressWarnings("unchecked")
-		final ListState<T> state = (ListState<T>) getPartitionedState(target, namespaceSerializer, listStateDescriptor);
+		final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+		final InternalKvState<N> kvState = (InternalKvState<N>) state;
 
-		@SuppressWarnings("unchecked")
-		final KvState<N> kvState = (KvState<N>) state;
-
-		// merge the sources
-		final List<T> result = new ArrayList<>();
-		for (N source: sources) {
-			kvState.setCurrentNamespace(source);
-			Iterable<T> sourceValue = state.get();
-			if (sourceValue != null) {
-				for (T o : sourceValue) {
-					result.add(o);
-				}
-			}
-			state.clear();
-		}
+		lastName = stateDescriptor.getName();
+		lastState = kvState;
+		kvState.setCurrentNamespace(namespace);
 
-		// write to the target
-		kvState.setCurrentNamespace(target);
-		for (T o : result) {
-			state.add(o);
-		}
+		return state;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 03f584e..15e0491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.util.Collection;
-
 /**
  * A keyed state backend provides methods for managing keyed state.
  *
@@ -64,10 +61,31 @@ public interface KeyedStateBackend<K> {
 	TypeSerializer<K> getKeySerializer();
 
 	/**
-	 * Creates or retrieves a partitioned state backed by this state backend.
+	 * Creates or retrieves a keyed state backed by this state backend.
 	 *
+	 * @param namespaceSerializer The serializer used for the namespace type of the state
 	 * @param stateDescriptor The identifier for the state. This contains name and can create a default state value.
+	 *    
+	 * @param <N> The type of the namespace.
+	 * @param <S> The type of the state.
+	 *
+	 * @return A new key/value state backed by this backend.
+	 *
+	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
+	 */
+	<N, S extends State, T> S getOrCreateKeyedState(
+			TypeSerializer<N> namespaceSerializer,
+			StateDescriptor<S, T> stateDescriptor) throws Exception;
 
+	/**
+	 * Creates or retrieves a partitioned state backed by this state backend.
+	 * 
+	 * TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
+	 *       This method should be removed for the sake of namespaces being lazily fetched from the keyed
+	 *       state backend, or being set on the state directly.
+	 *
+	 * @param stateDescriptor The identifier for the state. This contains name and can create a default state value.
+	 *
 	 * @param <N> The type of the namespace.
 	 * @param <S> The type of the state.
 	 *
@@ -75,20 +93,11 @@ public interface KeyedStateBackend<K> {
 	 *
 	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
 	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
 	<N, S extends State> S getPartitionedState(
 			N namespace,
 			TypeSerializer<N> namespaceSerializer,
 			StateDescriptor<S, ?> stateDescriptor) throws Exception;
 
-
-	@SuppressWarnings("unchecked,rawtypes")
-	<N, S extends MergingState<?, ?>> void mergePartitionedStates(
-			N target,
-			Collection<N> sources,
-			TypeSerializer<N> namespaceSerializer,
-			StateDescriptor<S, ?> stateDescriptor) throws Exception;
-
 	/**
 	 * Closes the backend and releases all resources.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
deleted file mode 100644
index aded79f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-/**
- * Key/Value state implementation for user-defined state. The state is backed by a state
- * backend, which typically follows one of the following patterns: Either the state is stored
- * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
- * state backend into some store (during checkpoints), or the key/value state is in fact backed
- * by an external key/value store as the state backend, and checkpoints merely record the
- * metadata of what is considered part of the checkpoint.
- * 
- * @param <N> The type of the namespace.
- */
-public interface KvState<N> {
-
-	/**
-	 * Sets the current namespace, which will be used when using the state access methods.
-	 *
-	 * @param namespace The namespace.
-	 */
-	void setCurrentNamespace(N namespace);
-
-	/**
-	 * Returns the serialized value for the given key and namespace.
-	 *
-	 * <p>If no value is associated with key and namespace, <code>null</code>
-	 * is returned.
-	 *
-	 * @param serializedKeyAndNamespace Serialized key and namespace
-	 * @return Serialized value or <code>null</code> if no value is associated
-	 * with the key and namespace.
-	 * @throws Exception Exceptions during serialization are forwarded
-	 */
-	byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespace.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespace.java
index 9ff9df0..fb2e512 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespace.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespace.java
@@ -18,18 +18,53 @@
 
 package org.apache.flink.runtime.state;
 
+import java.io.ObjectStreamException;
+
 /**
- * Uninstantiable placeholder class for state without a namespace.
+ * Singleton placeholder class for state without a namespace.
  */
 public final class VoidNamespace {
 
-	public static final VoidNamespace INSTANCE = new VoidNamespace();
+	// ------------------------------------------------------------------------
+	//  Singleton instance
+	// ------------------------------------------------------------------------
 
-	private VoidNamespace() {
-	}
+	/** The singleton instance */
+	public static final VoidNamespace INSTANCE = new VoidNamespace();
 
+	/** Getter for the singleton instance */
 	public static VoidNamespace get() {
 		return INSTANCE;
 	}
 
+	/** This class should not be instantiated */
+	private VoidNamespace() {}
+
+	// ------------------------------------------------------------------------
+	//  Standard Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 99;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this;
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Singleton serialization
+	// ------------------------------------------------------------------------
+
+	// make sure that we preserve the singleton properly on serialization
+	private Object readResolve() throws ObjectStreamException {
+		return INSTANCE;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
new file mode 100644
index 0000000..4ac7125
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
+ * that is stored on the heap.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends State, SD extends StateDescriptor<S, ?>>
+		extends AbstractHeapState<K, N, SV, S, SD>
+		implements InternalMergingState<N, IN, OUT> {
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param backend The state backend backing that created this state.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 */
+	protected AbstractHeapMergingState(
+			KeyedStateBackend<K> backend,
+			SD stateDesc,
+			StateTable<K, N, SV> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+
+		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		if (sources == null || sources.isEmpty()) {
+			return; // nothing to do
+		}
+
+		final K key = backend.getCurrentKey();
+		checkState(key != null, "No key set.");
+
+		final Map<N, Map<K, SV>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
+
+		if (namespaceMap != null) {
+			SV merged = null;
+
+			// merge the sources
+			for (N source : sources) {
+				Map<K, SV> keysForNamespace = namespaceMap.get(source);
+				if (keysForNamespace != null) {
+					// get and remove the next source per namespace/key
+					SV sourceState = keysForNamespace.remove(key);
+
+					// if the namespace map became empty, remove 
+					if (keysForNamespace.isEmpty()) {
+						namespaceMap.remove(source);
+					}
+
+					if (merged != null && sourceState != null) {
+						merged = mergeState(merged, sourceState);
+					}
+					else if (merged == null) {
+						merged = sourceState;
+					}
+				}
+			}
+
+			// merge into the target, if needed
+			if (merged != null) {
+				Map<K, SV> keysForTarget = namespaceMap.get(target);
+				if (keysForTarget == null) {
+					keysForTarget = createNewMap();
+					namespaceMap.put(target, keysForTarget);
+				}
+				SV targetState = keysForTarget.get(key);
+
+				if (targetState != null) {
+					targetState = mergeState(targetState, merged);
+				}
+				else {
+					targetState = merged;
+				}
+				keysForTarget.put(key, targetState);
+			}
+		}
+
+		// else no entries for that key at all, nothing to do skip
+	}
+
+	protected abstract SV mergeState(SV a, SV b) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 18d1bc7..18b71de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
@@ -44,7 +44,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @param <SD> The type of StateDescriptor for the State S
  */
 public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
-		implements KvState<N>, State {
+		implements InternalKvState<N> {
 
 	/** Map containing the actual key/value pairs */
 	protected final StateTable<K, N, SV> stateTable;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index 1679122..6df3f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -39,8 +40,9 @@ import java.util.Map;
  */
 public class HeapFoldingState<K, N, T, ACC>
 		extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
-		implements FoldingState<T, ACC> {
+		implements InternalFoldingState<N, T, ACC> {
 
+	/** The function used to fold the state */
 	private final FoldFunction<T, ACC> foldFunction;
 
 	/**
@@ -61,6 +63,10 @@ public class HeapFoldingState<K, N, T, ACC>
 		this.foldFunction = stateDesc.getFoldFunction();
 	}
 
+	// ------------------------------------------------------------------------
+	//  state access
+	// ------------------------------------------------------------------------
+
 	@Override
 	public ACC get() {
 		Preconditions.checkState(currentNamespace != null, "No namespace set.");

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 9a9178a..89d4f76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -19,14 +19,10 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
@@ -54,6 +50,10 @@ import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -133,17 +133,23 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return stateTable;
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
-	public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
+	public <N, V> InternalValueState<N, V> createValueState(
+			TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<V> stateDesc) throws Exception {
+
 		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
-	public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+	public <N, T> InternalListState<N, T> createListState(
+			TypeSerializer<N> namespaceSerializer,
+			ListStateDescriptor<T> stateDesc) throws Exception {
+
 		String name = stateDesc.getName();
+
+		@SuppressWarnings("unchecked")
 		StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(name);
 
 		RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo =
@@ -153,15 +159,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
-	public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+	public <N, T> InternalReducingState<N, T> createReducingState(
+			TypeSerializer<N> namespaceSerializer,
+			ReducingStateDescriptor<T> stateDesc) throws Exception {
+
 		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
-	@SuppressWarnings("unchecked")
+
 	@Override
-	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+	protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+			TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
 		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}