You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/03/29 15:59:41 UTC

[1/5] flink git commit: [FLINK-8908] Do not create copy when MapSerializer stateless.

Repository: flink
Updated Branches:
  refs/heads/master 98924f0ab -> db8e1f09b


[FLINK-8908] Do not create copy when MapSerializer stateless.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eea887b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eea887b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eea887b7

Branch: refs/heads/master
Commit: eea887b7d9a3cd43416feca568c9815d8362e8d4
Parents: 98924f0
Author: kkloudas <kk...@gmail.com>
Authored: Fri Mar 9 12:05:38 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:32:51 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/typeutils/base/MapSerializer.java    | 4 +++-
 .../java/org/apache/flink/runtime/state/HashMapSerializer.java   | 4 +++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eea887b7/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 23b494b..6471152 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -94,7 +94,9 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 		TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
 		TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
 
-		return new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
+		return (duplicateKeySerializer == keySerializer) && (duplicateValueSerializer == valueSerializer)
+				? this
+				: new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eea887b7/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index 066684b..c1b6346 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -95,7 +95,9 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>>
 		TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
 		TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
 
-		return new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
+		return (duplicateKeySerializer == keySerializer) && (duplicateValueSerializer == valueSerializer)
+				? this
+				: new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
 	}
 
 	@Override


[4/5] flink git commit: [FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.

Posted by kk...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 717711d..41df1b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -905,7 +905,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
 
 		// this is only available after the backend initialized the serializer
 		TypeSerializer<String> valueSerializer = kvId.getSerializer();
@@ -955,7 +955,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
 
 		backend.setCurrentKey(1);
 		assertEquals("1", restored1.value());
@@ -971,7 +971,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ValueState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
 
 		backend.setCurrentKey(1);
 		assertEquals("u1", restored2.value());
@@ -987,7 +987,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	/**
-	 * Tests {@link ValueState#value()} and {@link InternalKvState#getSerializedValue(byte[])}
+	 * Tests {@link ValueState#value()} and
+	 * {@link InternalKvState#getSerializedValue(byte[], TypeSerializer, TypeSerializer, TypeSerializer)}
 	 * accessing the state concurrently. They should not get in the way of each
 	 * other.
 	 */
@@ -1012,7 +1013,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		final TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
 		@SuppressWarnings("unchecked")
-		final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state;
+		final InternalKvState<Integer, Integer, String> kvState = (InternalKvState<Integer, Integer, String>) state;
 
 		/**
 		 * 1) Test that ValueState#value() before and after
@@ -1232,7 +1233,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
 
 		// this is only available after the backend initialized the serializer
 		TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
@@ -1290,7 +1291,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
 
 		backend.setCurrentKey(1);
 		assertEquals("1", joiner.join(restored1.get()));
@@ -1307,7 +1308,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
 
 		backend.setCurrentKey(1);
 		assertEquals("1,u1", joiner.join(restored2.get()));
@@ -1560,8 +1561,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		final Integer namespace3 = 3;
 
 		try {
-			InternalListState<Integer, Long> state =
-				(InternalListState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+			InternalListState<String, Integer, Long> state =
+				(InternalListState<String, Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
@@ -1676,7 +1677,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
 
 		// this is only available after the backend initialized the serializer
 		TypeSerializer<String> valueSerializer = kvId.getSerializer();
@@ -1726,7 +1727,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
 
 		backend.setCurrentKey(1);
 		assertEquals("1", restored1.get());
@@ -1742,7 +1743,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
 
 		backend.setCurrentKey(1);
 		assertEquals("1,u1", restored2.get());
@@ -1827,8 +1828,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
 
 		try {
-			final InternalReducingState<Integer, Long> state =
-				(InternalReducingState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+			final InternalReducingState<String, Integer, Long> state =
+				(InternalReducingState<String, Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
@@ -2000,8 +2001,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
 
 		try {
-			InternalAggregatingState<Integer, Long, Long> state =
-				(InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+			InternalAggregatingState<String, Integer, Long, Long, Long> state =
+				(InternalAggregatingState<String, Integer, Long, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
@@ -2173,8 +2174,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
 
 		try {
-			InternalAggregatingState<Integer, Long, Long> state =
-				(InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+			InternalAggregatingState<String, Integer, Long, Long, Long> state =
+				(InternalAggregatingState<String, Integer, Long, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
@@ -2292,7 +2293,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
 
 		// this is only available after the backend initialized the serializer
 		TypeSerializer<String> valueSerializer = kvId.getSerializer();
@@ -2343,7 +2344,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
 
 		backend.setCurrentKey(1);
 		assertEquals("Fold-Initial:,1", restored1.get());
@@ -2360,7 +2361,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		@SuppressWarnings("unchecked")
 		FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
 
 		backend.setCurrentKey(1);
 		assertEquals("Fold-Initial:,101", restored2.get());
@@ -2388,7 +2389,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		InternalKvState<Integer, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) state;
 
 		// these are only available after the backend initialized the serializer
 		TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
@@ -2491,7 +2492,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored1;
 
 		backend.setCurrentKey(1);
 		assertEquals("1", restored1.get(1));
@@ -2510,7 +2511,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		@SuppressWarnings("unchecked")
 		MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored2;
 
 		backend.setCurrentKey(1);
 		assertEquals("101", restored2.get(1));
@@ -3066,7 +3067,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+			InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3088,7 +3089,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+			InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3115,7 +3116,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+			InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3142,7 +3143,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+			InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3163,7 +3164,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					VoidNamespaceSerializer.INSTANCE,
 					desc);
 
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+			InternalKvState<Integer, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) state;
 			assertTrue(kvState instanceof AbstractHeapState);
 
 			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3325,7 +3326,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 
 			// insert some data to the backend.
-			InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
 				VoidNamespaceSerializer.INSTANCE,
 				new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3382,7 +3383,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		try {
 			backend = createKeyedBackend(IntSerializer.INSTANCE);
-			InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3429,7 +3430,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		try {
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle);
 
-			InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3544,7 +3545,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				return;
 			}
 
-			InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3598,7 +3599,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	 * if it is not null.
 	 */
 	protected static <V, K, N> V getSerializedValue(
-			InternalKvState<N> kvState,
+			InternalKvState<K, N, V> kvState,
 			K key,
 			TypeSerializer<K> keySerializer,
 			N namespace,
@@ -3608,7 +3609,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
 				key, keySerializer, namespace, namespaceSerializer);
 
-		byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+		byte[] serializedValue = kvState.getSerializedValue(
+				serializedKeyAndNamespace,
+				kvState.getKeySerializer(),
+				kvState.getNamespaceSerializer(),
+				kvState.getValueSerializer()
+		);
 
 		if (serializedValue == null) {
 			return null;
@@ -3622,7 +3628,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	 * if it is not null.
 	 */
 	private static <V, K, N> List<V> getSerializedList(
-			InternalKvState<N> kvState,
+			InternalKvState<K, N, V> kvState,
 			K key,
 			TypeSerializer<K> keySerializer,
 			N namespace,
@@ -3632,7 +3638,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
 				key, keySerializer, namespace, namespaceSerializer);
 
-		byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+		byte[] serializedValue = kvState.getSerializedValue(
+				serializedKeyAndNamespace,
+				kvState.getKeySerializer(),
+				kvState.getNamespaceSerializer(),
+				kvState.getValueSerializer()
+		);
 
 		if (serializedValue == null) {
 			return null;
@@ -3646,7 +3657,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	 * if it is not null.
 	 */
 	private static <UK, UV, K, N> Map<UK, UV> getSerializedMap(
-			InternalKvState<N> kvState,
+			InternalKvState<K, N, Map<UK, UV>> kvState,
 			K key,
 			TypeSerializer<K> keySerializer,
 			N namespace,
@@ -3658,7 +3669,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
 				key, keySerializer, namespace, namespaceSerializer);
 
-		byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+		byte[] serializedValue = kvState.getSerializedValue(
+				serializedKeyAndNamespace,
+				kvState.getKeySerializer(),
+				kvState.getNamespaceSerializer(),
+				kvState.getValueSerializer()
+		);
 
 		if (serializedValue == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 7d903cc..e646c64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -119,7 +119,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 
 		try {
 
-			InternalValueState<VoidNamespace, String> state =
+			InternalValueState<String, VoidNamespace, String> state =
 				stateBackend.createValueState(
 					new VoidNamespaceSerializer(),
 					stateDescriptor);
@@ -163,7 +163,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 
 			stateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
-			InternalValueState<VoidNamespace, String> state = stateBackend.createValueState(
+			InternalValueState<String, VoidNamespace, String> state = stateBackend.createValueState(
 				new VoidNamespaceSerializer(),
 				stateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index d83aa9f..815ceae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -69,7 +69,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 			final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
 			stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
 
-			InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+			InternalListState<String, Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
 
 			assertEquals(7, keyedBackend.numStateEntries());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 8915261..8c59979 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -43,11 +43,11 @@ import java.io.IOException;
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
+ * @param <V> The type of values kept internally in state.
  * @param <S> The type of {@link State}.
  * @param <SD> The type of {@link StateDescriptor}.
  */
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
-		implements InternalKvState<N>, State {
+public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends StateDescriptor<S, V>> implements InternalKvState<K, N, V>, State {
 
 	/** Serializer for the namespace. */
 	final TypeSerializer<N> namespaceSerializer;
@@ -114,25 +114,36 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<V> safeValueSerializer) throws Exception {
+
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
 
 		//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
-		Tuple2<K, N> des = KvStateSerializer.<K, N>deserializeKeyAndNamespace(
-				serializedKeyAndNamespace,
-				backend.getKeySerializer(),
-				namespaceSerializer);
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
-		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
+		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
 		// we cannot reuse the keySerializationStream member since this method
 		// is called concurrently to the other ones and it may thus contain garbage
 		ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128);
 		DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream);
 
-		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
-			tmpKeySerializationStream, tmpKeySerializationDateDataOutputView);
+		writeKeyWithGroupAndNamespace(
+				keyGroup,
+				keyAndNamespace.f0,
+				safeKeySerializer,
+				keyAndNamespace.f1,
+				safeNamespaceSerializer,
+				tmpKeySerializationStream,
+				tmpKeySerializationDateDataOutputView);
 
 		return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
 	}
@@ -151,11 +162,32 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 			ByteArrayOutputStreamWithPos keySerializationStream,
 			DataOutputView keySerializationDataOutputView) throws IOException {
 
+		writeKeyWithGroupAndNamespace(
+				keyGroup,
+				key,
+				backend.getKeySerializer(),
+				namespace,
+				namespaceSerializer,
+				keySerializationStream,
+				keySerializationDataOutputView);
+	}
+
+	protected void writeKeyWithGroupAndNamespace(
+			final int keyGroup,
+			final K key,
+			final TypeSerializer<K> keySerializer,
+			final N namespace,
+			final TypeSerializer<N> namespaceSerializer,
+			final ByteArrayOutputStreamWithPos keySerializationStream,
+			final DataOutputView keySerializationDataOutputView) throws IOException {
+
 		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+		Preconditions.checkNotNull(keySerializer);
+		Preconditions.checkNotNull(namespaceSerializer);
 
 		keySerializationStream.reset();
 		RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
-		RocksDBKeySerializationUtils.writeKey(key, backend.getKeySerializer(), keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
 		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 4dfc772..14671a5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -43,8 +43,8 @@ import java.util.Collection;
  * @param <R> The type of the value returned from the state
  */
 public class RocksDBAggregatingState<K, N, T, ACC, R>
-	extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC>
-	implements InternalAggregatingState<N, T, R> {
+		extends AbstractRocksDBState<K, N, ACC, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>>
+		implements InternalAggregatingState<K, N, T, ACC, R> {
 
 	/** Serializer for the values. */
 	private final TypeSerializer<ACC> valueSerializer;
@@ -73,6 +73,21 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return backend.getKeySerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<ACC> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
+	@Override
 	public R get() throws IOException {
 		try {
 			// prepare the current key and namespace for RocksDB lookup

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index d886f44..6f5baff 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -44,8 +44,8 @@ import java.io.IOException;
  */
 @Deprecated
 public class RocksDBFoldingState<K, N, T, ACC>
-	extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
-	implements InternalFoldingState<N, T, ACC> {
+		extends AbstractRocksDBState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+		implements InternalFoldingState<K, N, T, ACC> {
 
 	/** Serializer for the values. */
 	private final TypeSerializer<ACC> valueSerializer;
@@ -72,6 +72,21 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return backend.getKeySerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<ACC> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
+	@Override
 	public ACC get() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
@@ -108,5 +123,4 @@ public class RocksDBFoldingState<K, N, T, ACC>
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 41b7bd0..6a23181 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1203,7 +1203,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T> InternalValueState<N, T> createValueState(
+	protected <N, T> InternalValueState<K, N, T> createValueState(
 		TypeSerializer<N> namespaceSerializer,
 		ValueStateDescriptor<T> stateDesc) throws Exception {
 
@@ -1213,7 +1213,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T> InternalListState<N, T> createListState(
+	protected <N, T> InternalListState<K, N, T> createListState(
 		TypeSerializer<N> namespaceSerializer,
 		ListStateDescriptor<T> stateDesc) throws Exception {
 
@@ -1223,7 +1223,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T> InternalReducingState<N, T> createReducingState(
+	protected <N, T> InternalReducingState<K, N, T> createReducingState(
 		TypeSerializer<N> namespaceSerializer,
 		ReducingStateDescriptor<T> stateDesc) throws Exception {
 
@@ -1233,7 +1233,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+	protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
 		TypeSerializer<N> namespaceSerializer,
 		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
 
@@ -1242,7 +1242,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+	protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
 		TypeSerializer<N> namespaceSerializer,
 		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
@@ -1252,7 +1252,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+	protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
 		TypeSerializer<N> namespaceSerializer,
 		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 62c169b..d9bcb6a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -47,8 +47,8 @@ import java.util.List;
  * @param <V> The type of the values in the list state.
  */
 public class RocksDBListState<K, N, V>
-	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>>
-	implements InternalListState<N, V> {
+		extends AbstractRocksDBState<K, N, List<V>, ListState<V>, ListStateDescriptor<V>>
+		implements InternalListState<K, N, V> {
 
 	/** Serializer for the values. */
 	private final TypeSerializer<V> valueSerializer;
@@ -75,6 +75,21 @@ public class RocksDBListState<K, N, V>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return backend.getKeySerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<List<V>> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
+	@Override
 	public Iterable<V> get() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index af789ac..c75a2ed 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -52,14 +53,14 @@ import java.util.Map;
  * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
  * we use the {@code merge()} call.
  *
- * @param <K>  The type of the key.
- * @param <N>  The type of the namespace.
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
  * @param <UK> The type of the keys in the map state.
  * @param <UV> The type of the values in the map state.
  */
 public class RocksDBMapState<K, N, UK, UV>
-	extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>>
-	implements InternalMapState<N, UK, UV> {
+		extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
+		implements InternalMapState<K, N, UK, UV, Map<UK, UV>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
 
@@ -76,7 +77,8 @@ public class RocksDBMapState<K, N, UK, UV>
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state.
 	 */
-	public RocksDBMapState(ColumnFamilyHandle columnFamily,
+	public RocksDBMapState(
+			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc,
 			RocksDBKeyedStateBackend<K> backend) {
@@ -87,6 +89,21 @@ public class RocksDBMapState<K, N, UK, UV>
 		this.userValueSerializer = stateDesc.getValueSerializer();
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return backend.getKeySerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<Map<UK, UV>> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  MapState Implementation
 	// ------------------------------------------------------------------------
@@ -158,7 +175,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		return new Iterable<UK>() {
 			@Override
 			public Iterator<UK> iterator() {
-				return new RocksDBMapIterator<UK>(backend.db, prefixBytes) {
+				return new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
 					@Override
 					public UK next() {
 						RocksDBMapEntry entry = nextEntry();
@@ -176,7 +193,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		return new Iterable<UV>() {
 			@Override
 			public Iterator<UV> iterator() {
-				return new RocksDBMapIterator<UV>(backend.db, prefixBytes) {
+				return new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
 					@Override
 					public UV next() {
 						RocksDBMapEntry entry = nextEntry();
@@ -191,7 +208,7 @@ public class RocksDBMapState<K, N, UK, UV>
 	public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
 
-		return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes) {
+		return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
 			@Override
 			public Map.Entry<UK, UV> next() {
 				return nextEntry();
@@ -215,23 +232,48 @@ public class RocksDBMapState<K, N, UK, UV>
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
+
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
 
 		//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
-		Tuple2<K, N> des = KvStateSerializer.deserializeKeyAndNamespace(
-				serializedKeyAndNamespace,
-				backend.getKeySerializer(),
-				namespaceSerializer);
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
-		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
+		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
 		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
 		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
-		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView);
+
+		writeKeyWithGroupAndNamespace(
+				keyGroup,
+				keyAndNamespace.f0,
+				safeKeySerializer,
+				keyAndNamespace.f1,
+				safeNamespaceSerializer,
+				outputStream,
+				outputView);
+
 		final byte[] keyPrefixBytes = outputStream.toByteArray();
 
-		final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) {
+		final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
+
+		final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
+		final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
+
+		final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(
+				backend.db,
+				keyPrefixBytes,
+				dupUserKeySerializer,
+				dupUserValueSerializer) {
+
 			@Override
 			public Map.Entry<UK, UV> next() {
 				return nextEntry();
@@ -248,7 +290,7 @@ public class RocksDBMapState<K, N, UK, UV>
 			public Iterator<Map.Entry<UK, UV>> iterator() {
 				return iterator;
 			}
-		}, userKeySerializer, userValueSerializer);
+		}, dupUserKeySerializer, dupUserValueSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -270,34 +312,42 @@ public class RocksDBMapState<K, N, UK, UV>
 	}
 
 	private byte[] serializeUserValue(UV userValue) throws IOException {
+		return serializeUserValue(userValue, userValueSerializer);
+	}
+
+	private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
+		return deserializeUserValue(rawValueBytes, userValueSerializer);
+	}
+
+	private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
 		keySerializationStream.reset();
 
 		if (userValue == null) {
 			keySerializationDataOutputView.writeBoolean(true);
 		} else {
 			keySerializationDataOutputView.writeBoolean(false);
-			userValueSerializer.serialize(userValue, keySerializationDataOutputView);
+			valueSerializer.serialize(userValue, keySerializationDataOutputView);
 		}
 
 		return keySerializationStream.toByteArray();
 	}
 
-	private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException {
+	private UK deserializeUserKey(byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
 		ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 
 		in.skipBytes(userKeyOffset);
 
-		return userKeySerializer.deserialize(in);
+		return keySerializer.deserialize(in);
 	}
 
-	private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
+	private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
 		ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 
 		boolean isNull = in.readBoolean();
 
-		return isNull ? null : userValueSerializer.deserialize(in);
+		return isNull ? null : valueSerializer.deserialize(in);
 	}
 
 	// ------------------------------------------------------------------------
@@ -320,15 +370,25 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		/** The user key and value. The deserialization is performed lazily, i.e. the key
 		 * and the value is deserialized only when they are accessed. */
-		private UK userKey = null;
-		private UV userValue = null;
+		private UK userKey;
+
+		private UV userValue;
+
+		private TypeSerializer<UK> keySerializer;
+
+		private TypeSerializer<UV> valueSerializer;
 
 		RocksDBMapEntry(
-			@Nonnull final RocksDB db,
-			@Nonnull final byte[] rawKeyBytes,
-			@Nonnull final byte[] rawValueBytes) {
+				@Nonnull final RocksDB db,
+				@Nonnull final byte[] rawKeyBytes,
+				@Nonnull final byte[] rawValueBytes,
+				@Nonnull final TypeSerializer<UK> keySerializer,
+				@Nonnull final TypeSerializer<UV> valueSerializer) {
 			this.db = db;
 
+			this.keySerializer = keySerializer;
+			this.valueSerializer = valueSerializer;
+
 			this.rawKeyBytes = rawKeyBytes;
 			this.rawValueBytes = rawValueBytes;
 			this.deleted = false;
@@ -349,9 +409,9 @@ public class RocksDBMapState<K, N, UK, UV>
 		public UK getKey() {
 			if (userKey == null) {
 				try {
-					userKey = deserializeUserKey(rawKeyBytes);
+					userKey = deserializeUserKey(rawKeyBytes, keySerializer);
 				} catch (IOException e) {
-					throw new RuntimeException("Error while deserializing the user key.");
+					throw new RuntimeException("Error while deserializing the user key.", e);
 				}
 			}
 
@@ -365,9 +425,9 @@ public class RocksDBMapState<K, N, UK, UV>
 			} else {
 				if (userValue == null) {
 					try {
-						userValue = deserializeUserValue(rawValueBytes);
+						userValue = deserializeUserValue(rawValueBytes, valueSerializer);
 					} catch (IOException e) {
-						throw new RuntimeException("Error while deserializing the user value.");
+						throw new RuntimeException("Error while deserializing the user value.", e);
 					}
 				}
 
@@ -385,7 +445,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 			try {
 				userValue = value;
-				rawValueBytes = serializeUserValue(value);
+				rawValueBytes = serializeUserValue(value, valueSerializer);
 
 				db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 			} catch (IOException | RocksDBException e) {
@@ -399,14 +459,14 @@ public class RocksDBMapState<K, N, UK, UV>
 	/** An auxiliary utility to scan all entries under the given key. */
 	private abstract class RocksDBMapIterator<T> implements Iterator<T> {
 
-		static final int CACHE_SIZE_LIMIT = 128;
+		private static final int CACHE_SIZE_LIMIT = 128;
 
 		/** The db where data resides. */
 		private final RocksDB db;
 
 		/**
 		 * The prefix bytes of the key being accessed. All entries under the same key
-		 * has the same prefix, hence we can stop the iterating once coming across an
+		 * have the same prefix, hence we can stop iterating once coming across an
 		 * entry with a different prefix.
 		 */
 		private final byte[] keyPrefixBytes;
@@ -421,9 +481,19 @@ public class RocksDBMapState<K, N, UK, UV>
 		private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
 		private int cacheIndex = 0;
 
-		RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) {
+		private final TypeSerializer<UK> keySerializer;
+		private final TypeSerializer<UV> valueSerializer;
+
+		RocksDBMapIterator(
+				final RocksDB db,
+				final byte[] keyPrefixBytes,
+				final TypeSerializer<UK> keySerializer,
+				final TypeSerializer<UV> valueSerializer) {
+
 			this.db = db;
 			this.keyPrefixBytes = keyPrefixBytes;
+			this.keySerializer = keySerializer;
+			this.valueSerializer = valueSerializer;
 		}
 
 		@Override
@@ -436,7 +506,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		@Override
 		public void remove() {
 			if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
-				throw new IllegalStateException("The remove operation must be called after an valid next operation.");
+				throw new IllegalStateException("The remove operation must be called after a valid next operation.");
 			}
 
 			RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
@@ -503,7 +573,13 @@ public class RocksDBMapState<K, N, UK, UV>
 					break;
 				}
 
-				RocksDBMapEntry entry = new RocksDBMapEntry(db, iterator.key(), iterator.value());
+				RocksDBMapEntry entry = new RocksDBMapEntry(
+						db,
+						iterator.key(),
+						iterator.value(),
+						keySerializer,
+						valueSerializer);
+
 				cacheEntries.add(entry);
 
 				iterator.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 2a7f6e0..bef5e40 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -42,8 +42,8 @@ import java.util.Collection;
  * @param <V> The type of value that the state state stores.
  */
 public class RocksDBReducingState<K, N, V>
-	extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V>
-	implements InternalReducingState<N, V> {
+		extends AbstractRocksDBState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>>
+		implements InternalReducingState<K, N, V> {
 
 	/** Serializer for the values. */
 	private final TypeSerializer<V> valueSerializer;
@@ -69,6 +69,21 @@ public class RocksDBReducingState<K, N, V>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return backend.getKeySerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<V> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
+	@Override
 	public V get() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 99718be..6d7fb56 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -39,8 +39,8 @@ import java.io.IOException;
  * @param <V> The type of value that the state state stores.
  */
 public class RocksDBValueState<K, N, V>
-	extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, V>
-	implements InternalValueState<N, V> {
+		extends AbstractRocksDBState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
+		implements InternalValueState<K, N, V> {
 
 	/** Serializer for the values. */
 	private final TypeSerializer<V> valueSerializer;
@@ -62,6 +62,21 @@ public class RocksDBValueState<K, N, V>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return backend.getKeySerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<V> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
+	@Override
 	public V value() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index fd01446..65cf167 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -76,7 +76,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 
 	private transient EvictorContext evictorContext;
 
-	private transient InternalListState<W, StreamRecord<IN>> evictingWindowState;
+	private transient InternalListState<K, W, StreamRecord<IN>> evictingWindowState;
 
 	// ------------------------------------------------------------------------
 
@@ -428,7 +428,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 		super.open();
 
 		evictorContext = new EvictorContext(null, null);
-		evictingWindowState = (InternalListState<W, StreamRecord<IN>>)
+		evictingWindowState = (InternalListState<K, W, StreamRecord<IN>>)
 				getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index aa19168..9e3898c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -144,16 +144,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	// ------------------------------------------------------------------------
 
 	/** The state in which the window contents is stored. Each window is a namespace */
-	private transient InternalAppendingState<W, IN, ACC> windowState;
+	private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;
 
 	/**
 	 * The {@link #windowState}, typed to merging state for merging windows.
 	 * Null if the window state is not mergeable.
 	 */
-	private transient InternalMergingState<W, IN, ACC> windowMergingState;
+	private transient InternalMergingState<K, W, IN, ACC, ACC> windowMergingState;
 
 	/** The state that holds the merging window metadata (the sets that describe what is merged). */
-	private transient InternalListState<VoidNamespace, Tuple2<W, W>> mergingSetsState;
+	private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;
 
 	/**
 	 * This is given to the {@code InternalWindowFunction} for emitting elements with a given
@@ -234,7 +234,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		// create (or restore) the state that hold the actual window contents
 		// NOTE - the state may be null in the case of the overriding evicting window operator
 		if (windowStateDescriptor != null) {
-			windowState = (InternalAppendingState<W, IN, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
+			windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
 		}
 
 		// create the typed and helper states for merging windows
@@ -242,7 +242,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 			// store a typed reference for the state of merging windows - sanity check
 			if (windowState instanceof InternalMergingState) {
-				windowMergingState = (InternalMergingState<W, IN, ACC>) windowState;
+				windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
 			}
 			// TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
 			// TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
@@ -263,7 +263,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					new ListStateDescriptor<>("merging-window-set", tupleSerializer);
 
 			// get the state that stores the merging sets
-			mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>)
+			mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
 					getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
 			mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
 		}
@@ -883,7 +883,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 					if (rawState instanceof InternalMergingState) {
 						@SuppressWarnings("unchecked")
-						InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState;
+						InternalMergingState<K, W, ?, ?, ?> mergingState = (InternalMergingState<K, W, ?, ?, ?>) rawState;
 						mergingState.mergeNamespaces(window, mergedWindows);
 					}
 					else {

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 5aa47e8..1536956 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -379,7 +379,7 @@ public class TriggerTestHarness<T, W extends Window> {
 
 				if (rawState instanceof InternalMergingState) {
 					@SuppressWarnings("unchecked")
-					InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState;
+					InternalMergingState<K, W, ?, ?, ?> mergingState = (InternalMergingState<K, W, ?, ?, ?>) rawState;
 					mergingState.mergeNamespaces(window, mergedWindows);
 				}
 				else {


[5/5] flink git commit: [FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.

Posted by kk...@apache.org.
[FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.

This closes #5691.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db8e1f09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db8e1f09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db8e1f09

Branch: refs/heads/master
Commit: db8e1f09bd7dcd9f392bf987e96cddcb34665b6c
Parents: c16e2c9
Author: kkloudas <kk...@gmail.com>
Authored: Fri Mar 9 22:47:35 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:35:39 2018 +0200

----------------------------------------------------------------------
 .../flink/queryablestate/network/Client.java    |   2 +-
 .../server/KvStateServerHandler.java            |  26 ++-
 .../queryablestate/network/ClientTest.java      |   4 +-
 .../KVStateRequestSerializerRocksDBTest.java    |  16 +-
 .../network/KvStateRequestSerializerTest.java   |  30 ++-
 .../network/KvStateServerHandlerTest.java       |  46 +++-
 .../flink/runtime/query/KvStateEntry.java       |  75 +++++++
 .../apache/flink/runtime/query/KvStateInfo.java | 114 ++++++++++
 .../flink/runtime/query/KvStateRegistry.java    |  21 +-
 .../runtime/query/TaskKvStateRegistry.java      |   4 +-
 .../state/AbstractKeyedStateBackend.java        |  23 +-
 .../state/heap/AbstractHeapMergingState.java    |   9 +-
 .../runtime/state/heap/AbstractHeapState.java   |  31 ++-
 .../state/heap/HeapAggregatingState.java        |  19 +-
 .../runtime/state/heap/HeapFoldingState.java    |  20 +-
 .../state/heap/HeapKeyedStateBackend.java       |  16 +-
 .../flink/runtime/state/heap/HeapListState.java |  47 +++-
 .../flink/runtime/state/heap/HeapMapState.java  |  55 +++--
 .../runtime/state/heap/HeapReducingState.java   |  20 +-
 .../runtime/state/heap/HeapValueState.java      |  20 +-
 .../internal/InternalAggregatingState.java      |  13 +-
 .../state/internal/InternalAppendingState.java  |  12 +-
 .../state/internal/InternalFoldingState.java    |   5 +-
 .../runtime/state/internal/InternalKvState.java |  37 +++-
 .../state/internal/InternalListState.java       |   6 +-
 .../state/internal/InternalMapState.java        |   5 +-
 .../state/internal/InternalMergingState.java    |  12 +-
 .../state/internal/InternalReducingState.java   |   5 +-
 .../state/internal/InternalValueState.java      |   5 +-
 .../runtime/query/KvStateRegistryTest.java      | 212 ++++++++++++++++++-
 .../runtime/state/StateBackendTestBase.java     |  96 +++++----
 .../state/StateSnapshotCompressionTest.java     |   4 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |   2 +-
 .../streaming/state/AbstractRocksDBState.java   |  58 +++--
 .../state/RocksDBAggregatingState.java          |  19 +-
 .../streaming/state/RocksDBFoldingState.java    |  20 +-
 .../state/RocksDBKeyedStateBackend.java         |  12 +-
 .../streaming/state/RocksDBListState.java       |  19 +-
 .../streaming/state/RocksDBMapState.java        | 152 +++++++++----
 .../streaming/state/RocksDBReducingState.java   |  19 +-
 .../streaming/state/RocksDBValueState.java      |  19 +-
 .../windowing/EvictingWindowOperator.java       |   4 +-
 .../operators/windowing/WindowOperator.java     |  14 +-
 .../operators/windowing/TriggerTestHarness.java |   2 +-
 44 files changed, 1074 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 364f835..6b60492 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -281,7 +281,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		 * @param request the request to be sent.
 		 * @return Future holding the serialized result
 		 */
-		public CompletableFuture<RESP> sendRequest(REQ request) {
+		CompletableFuture<RESP> sendRequest(REQ request) {
 			synchronized (connectLock) {
 				if (failureCause != null) {
 					return FutureUtils.getFailedFuture(failureCause);

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 18a2944..d46deff 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -26,6 +26,8 @@ import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.AbstractServerHandler;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateEntry;
+import org.apache.flink.runtime.query.KvStateInfo;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.ExceptionUtils;
@@ -33,9 +35,6 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -50,8 +49,6 @@ import java.util.concurrent.CompletableFuture;
 @ChannelHandler.Sharable
 public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
-
 	/** KvState registry holding references to the KvState instances. */
 	private final KvStateRegistry registry;
 
@@ -78,13 +75,13 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
 		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
 
 		try {
-			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
+			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
 			if (kvState == null) {
 				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
 			} else {
 				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
 
-				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
+				byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace);
 				if (serializedResult != null) {
 					responseFuture.complete(new KvStateResponse(serializedResult));
 				} else {
@@ -100,6 +97,21 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
 		}
 	}
 
+	private static <K, N, V> byte[] getSerializedValue(
+			final KvStateEntry<K, N, V> entry,
+			final byte[] serializedKeyAndNamespace) throws Exception {
+
+		final InternalKvState<K, N, V> state = entry.getState();
+		final KvStateInfo<K, N, V> infoForCurrentThread = entry.getInfoForCurrentThread();
+
+		return state.getSerializedValue(
+				serializedKeyAndNamespace,
+				infoForCurrentThread.getKeySerializer(),
+				infoForCurrentThread.getNamespaceSerializer(),
+				infoForCurrentThread.getStateValueSerializer()
+		);
+	}
+
 	@Override
 	public CompletableFuture<Void> shutdown() {
 		return CompletableFuture.completedFuture(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 6aa4710..bceb361 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -685,8 +685,8 @@ public class ClientTest {
 
 				state.update(201 + i);
 
-				// we know it must be a KvStat but this is not exposed to the user via State
-				InternalKvState<?> kvState = (InternalKvState<?>) state;
+				// we know it must be a KvState but this is not exposed to the user via State
+				InternalKvState<Integer, ?, Integer> kvState = (InternalKvState<Integer, ?, Integer>) state;
 
 				// Register KvState (one state instance for all server)
 				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index dd75dd6..4985bf3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -41,6 +41,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
 import java.io.File;
+import java.util.Map;
 
 import static org.mockito.Mockito.mock;
 
@@ -82,7 +83,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 		}
 
 		@Override
-		public <N, T> InternalListState<N, T> createListState(
+		public <N, T> InternalListState<K, N, T> createListState(
 			final TypeSerializer<N> namespaceSerializer,
 			final ListStateDescriptor<T> stateDesc) throws Exception {
 
@@ -120,7 +121,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend
 			.createListState(VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
@@ -159,11 +160,12 @@ public final class KVStateRequestSerializerRocksDBTest {
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
-				longHeapKeyedStateBackend.getPartitionedState(
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE,
-						new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+		final InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>> mapState =
+				(InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>>)
+						longHeapKeyedStateBackend.getPartitionedState(
+								VoidNamespace.INSTANCE,
+								VoidNamespaceSerializer.INSTANCE,
+								new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
 
 		KvStateRequestSerializerTest.testMapSerialization(key, mapState);
 		longHeapKeyedStateBackend.dispose();

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 8d10141..dac1b90 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -200,7 +200,7 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
 				VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
@@ -220,7 +220,7 @@ public class KvStateRequestSerializerTest {
 	 */
 	public static void testListSerialization(
 			final long key,
-			final InternalListState<VoidNamespace, Long> listState) throws Exception {
+			final InternalListState<Long, VoidNamespace, Long> listState) throws Exception {
 
 		TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
 		listState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -240,7 +240,11 @@ public class KvStateRequestSerializerTest {
 				key, LongSerializer.INSTANCE,
 				VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
 
-		final byte[] serializedValues = listState.getSerializedValue(serializedKey);
+		final byte[] serializedValues = listState.getSerializedValue(
+				serializedKey,
+				listState.getKeySerializer(),
+				listState.getNamespaceSerializer(),
+				listState.getValueSerializer());
 
 		List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer);
 		assertEquals(expectedValues, actualValues);
@@ -303,10 +307,12 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>) longHeapKeyedStateBackend.getPartitionedState(
-				VoidNamespace.INSTANCE,
-				VoidNamespaceSerializer.INSTANCE,
-				new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+		final InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>> mapState =
+				(InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>>)
+						longHeapKeyedStateBackend.getPartitionedState(
+								VoidNamespace.INSTANCE,
+								VoidNamespaceSerializer.INSTANCE,
+								new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
 
 		testMapSerialization(key, mapState);
 	}
@@ -322,9 +328,9 @@ public class KvStateRequestSerializerTest {
 	 *
 	 * @throws Exception
 	 */
-	public static void testMapSerialization(
+	public static <M extends Map<Long, String>> void testMapSerialization(
 			final long key,
-			final InternalMapState<VoidNamespace, Long, String> mapState) throws Exception {
+			final InternalMapState<Long, VoidNamespace, Long, String, M> mapState) throws Exception {
 
 		TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
 		TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
@@ -348,7 +354,11 @@ public class KvStateRequestSerializerTest {
 				key, LongSerializer.INSTANCE,
 				VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
 
-		final byte[] serializedValues = mapState.getSerializedValue(serializedKey);
+		final byte[] serializedValues = mapState.getSerializedValue(
+				serializedKey,
+				mapState.getKeySerializer(),
+				mapState.getNamespaceSerializer(),
+				mapState.getValueSerializer());
 
 		Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
 		assertEquals(expectedValues.size(), actualValues.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 8b1517c..9947dac 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.queryablestate.network;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.queryablestate.KvStateID;
@@ -70,9 +72,6 @@ import java.util.concurrent.TimeoutException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link KvStateServerHandler}.
@@ -286,7 +285,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call.
+	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[], TypeSerializer, TypeSerializer, TypeSerializer)} call.
 	 */
 	@Test
 	public void testFailureOnGetSerializedValue() throws Exception {
@@ -300,9 +299,42 @@ public class KvStateServerHandlerTest extends TestLogger {
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Failing KvState
-		InternalKvState<?> kvState = mock(InternalKvState.class);
-		when(kvState.getSerializedValue(any(byte[].class)))
-				.thenThrow(new RuntimeException("Expected test Exception"));
+		InternalKvState<Integer, VoidNamespace, Long> kvState =
+				new InternalKvState<Integer, VoidNamespace, Long>() {
+					@Override
+					public TypeSerializer<Integer> getKeySerializer() {
+						return IntSerializer.INSTANCE;
+					}
+
+					@Override
+					public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
+						return VoidNamespaceSerializer.INSTANCE;
+					}
+
+					@Override
+					public TypeSerializer<Long> getValueSerializer() {
+						return LongSerializer.INSTANCE;
+					}
+
+					@Override
+					public void setCurrentNamespace(VoidNamespace namespace) {
+						// do nothing
+					}
+
+					@Override
+					public byte[] getSerializedValue(
+							final byte[] serializedKeyAndNamespace,
+							final TypeSerializer<Integer> safeKeySerializer,
+							final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
+							final TypeSerializer<Long> safeValueSerializer) throws Exception {
+						throw new RuntimeException("Expected test Exception");
+					}
+
+					@Override
+					public void clear() {
+
+					}
+				};
 
 		KvStateID kvStateId = registry.registerKvState(
 				new JobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
new file mode 100644
index 0000000..0bd132f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace the state is associated to
+ * @param <V> The type of values kept internally in state
+ */
+@Internal
+public class KvStateEntry<K, N, V> {
+
+	private final InternalKvState<K, N, V> state;
+	private final KvStateInfo<K, N, V> stateInfo;
+
+	private final boolean areSerializersStateless;
+
+	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
+
+	public KvStateEntry(final InternalKvState<K, N, V> state) {
+		this.state = Preconditions.checkNotNull(state);
+		this.stateInfo = new KvStateInfo<>(
+				state.getKeySerializer(),
+				state.getNamespaceSerializer(),
+				state.getValueSerializer()
+		);
+		this.serializerCache = new ConcurrentHashMap<>();
+		this.areSerializersStateless = stateInfo.duplicate() == stateInfo;
+	}
+
+	public InternalKvState<K, N, V> getState() {
+		return state;
+	}
+
+	public KvStateInfo<K, N, V> getInfoForCurrentThread() {
+		return areSerializersStateless
+				? stateInfo
+				: serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate());
+	}
+
+	public void clear() {
+		serializerCache.clear();
+	}
+
+	@VisibleForTesting
+	public int getCacheSize() {
+		return serializerCache.size();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
new file mode 100644
index 0000000..aa94e41
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Metadata about a {@link InternalKvState}. This includes the serializers for
+ * the key, the namespace, and the values kept in the state.
+ *
+ * @param <K>	The type of key the state is associated to
+ * @param <N>	The type of the namespace the state is associated to
+ * @param <V>	The type of values kept internally in state
+ */
+public class KvStateInfo<K, N, V> {
+
+	private final TypeSerializer<K> keySerializer;
+	private final TypeSerializer<N> namespaceSerializer;
+	private final TypeSerializer<V> stateValueSerializer;
+
+	public KvStateInfo(
+			final TypeSerializer<K> keySerializer,
+			final TypeSerializer<N> namespaceSerializer,
+			final TypeSerializer<V> stateValueSerializer
+	) {
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+		this.stateValueSerializer = Preconditions.checkNotNull(stateValueSerializer);
+	}
+
+	/**
+	 * @return The serializer for the key the state is associated to.
+	 */
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	/**
+	 * @return The serializer for the namespace the state is associated to.
+	 */
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	/**
+	 * @return The serializer for the values kept in the state.
+	 */
+	public TypeSerializer<V> getStateValueSerializer() {
+		return stateValueSerializer;
+	}
+
+	/**
+	 * Creates a deep copy of the current {@link KvStateInfo} by duplicating
+	 * all the included serializers.
+	 *
+	 * <p>This method assumes correct implementation of the {@link TypeSerializer#duplicate()}
+	 * method of the included serializers.
+	 */
+	public KvStateInfo<K, N, V> duplicate() {
+		final TypeSerializer<K> dupKeySerializer = keySerializer.duplicate();
+		final TypeSerializer<N> dupNamespaceSerializer = namespaceSerializer.duplicate();
+		final TypeSerializer<V> dupSVSerializer = stateValueSerializer.duplicate();
+
+		if (
+			dupKeySerializer == keySerializer &&
+			dupNamespaceSerializer == namespaceSerializer &&
+			dupSVSerializer == stateValueSerializer
+		) {
+			return this;
+		}
+
+		return new KvStateInfo<>(dupKeySerializer, dupNamespaceSerializer, dupSVSerializer);
+
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		KvStateInfo<?, ?, ?> stateInfo = (KvStateInfo<?, ?, ?>) o;
+		return Objects.equals(keySerializer, stateInfo.keySerializer) &&
+				Objects.equals(namespaceSerializer, stateInfo.namespaceSerializer) &&
+				Objects.equals(stateValueSerializer, stateInfo.stateValueSerializer);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(keySerializer, namespaceSerializer, stateValueSerializer);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 2c55463..63d3c52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -41,8 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
 public class KvStateRegistry {
 
 	/** All registered KvState instances. */
-	private final ConcurrentHashMap<KvStateID, InternalKvState<?>> registeredKvStates =
-			new ConcurrentHashMap<>();
+	private final ConcurrentHashMap<KvStateID, KvStateEntry<?, ?, ?>> registeredKvStates = new ConcurrentHashMap<>(4);
 
 	/** Registry listeners to be notified on registration/unregistration. */
 	private final ConcurrentHashMap<JobID, KvStateRegistryListener> listeners = new ConcurrentHashMap<>(4);
@@ -86,11 +85,11 @@ public class KvStateRegistry {
 			JobVertexID jobVertexId,
 			KeyGroupRange keyGroupRange,
 			String registrationName,
-			InternalKvState<?> kvState) {
+			InternalKvState<?, ?, ?> kvState) {
 
 		KvStateID kvStateId = new KvStateID();
 
-		if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
+		if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry<>(kvState)) == null) {
 			final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
 
 			if (listener != null) {
@@ -123,7 +122,10 @@ public class KvStateRegistry {
 			String registrationName,
 			KvStateID kvStateId) {
 
-		if (registeredKvStates.remove(kvStateId) != null) {
+		KvStateEntry<?, ?, ?> entry = registeredKvStates.remove(kvStateId);
+		if (entry != null) {
+			entry.clear();
+
 			final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
 			if (listener != null) {
 				listener.notifyKvStateUnregistered(
@@ -136,13 +138,13 @@ public class KvStateRegistry {
 	}
 
 	/**
-	 * Returns the KvState instance identified by the given KvStateID or
-	 * <code>null</code> if none is registered.
+	 * Returns the {@link KvStateEntry} containing the requested instance as identified by the
+	 * given KvStateID, along with its {@link KvStateInfo} or <code>null</code> if none is registered.
 	 *
 	 * @param kvStateId KvStateID to identify the KvState instance
-	 * @return KvState instance identified by the KvStateID or <code>null</code>
+	 * @return The {@link KvStateEntry} instance identified by the KvStateID or <code>null</code> if there is none
 	 */
-	public InternalKvState<?> getKvState(KvStateID kvStateId) {
+	public KvStateEntry<?, ?, ?> getKvState(KvStateID kvStateId) {
 		return registeredKvStates.get(kvStateId);
 	}
 
@@ -174,5 +176,4 @@ public class KvStateRegistry {
 		}
 		return listener;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index f799b5a..a44a508 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -60,7 +60,7 @@ public class TaskKvStateRegistry {
 	 *                         descriptor used to create the KvState instance)
 	 * @param kvState          The
 	 */
-	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?> kvState) {
+	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?, ?, ?> kvState) {
 		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
 		registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
 	}
@@ -85,7 +85,7 @@ public class TaskKvStateRegistry {
 
 		private final KvStateID kvStateId;
 
-		public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+		KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
 			this.keyGroupRange = keyGroupRange;
 			this.registrationName = registrationName;
 			this.kvStateId = kvStateId;

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e7b3a1a..287474c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -53,6 +53,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -79,7 +80,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	private int currentKeyGroup;
 
 	/** So that we can give out state when the user uses the same key. */
-	protected final HashMap<String, InternalKvState<?>> keyValueStatesByName;
+	protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
 
 	/** For caching the last accessed partitioned state */
 	private String lastName;
@@ -161,7 +162,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the value that the {@code ValueState} can store.
 	 */
-	protected abstract <N, T> InternalValueState<N, T> createValueState(
+	protected abstract <N, T> InternalValueState<K, N, T> createValueState(
 			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<T> stateDesc) throws Exception;
 
@@ -174,7 +175,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T> InternalListState<N, T> createListState(
+	protected abstract <N, T> InternalListState<K, N, T> createListState(
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception;
 
@@ -187,7 +188,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T> InternalReducingState<N, T> createReducingState(
+	protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<T> stateDesc) throws Exception;
 
@@ -200,7 +201,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+	protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
 			TypeSerializer<N> namespaceSerializer,
 			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
 
@@ -217,7 +218,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @deprecated will be removed in a future version
 	 */
 	@Deprecated
-	protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+	protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 
@@ -231,7 +232,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <UK> Type of the keys in the state
 	 * @param <UV> Type of the values in the state	 *
 	 */
-	protected abstract <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+	protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
 			TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception;
 
@@ -336,7 +337,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 		}
 
-		InternalKvState<?> existing = keyValueStatesByName.get(stateDescriptor.getName());
+		InternalKvState<K, ?, ?> existing = keyValueStatesByName.get(stateDescriptor.getName());
 		if (existing != null) {
 			@SuppressWarnings("unchecked")
 			S typedState = (S) existing;
@@ -379,7 +380,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		});
 
 		@SuppressWarnings("unchecked")
-		InternalKvState<N> kvState = (InternalKvState<N>) state;
+		InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
 		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
 
 		// Publish queryable state
@@ -416,7 +417,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			return (S) lastState;
 		}
 
-		InternalKvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
+		InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
 		if (previous != null) {
 			lastState = previous;
 			lastState.setCurrentNamespace(namespace);
@@ -425,7 +426,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		}
 
 		final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
-		final InternalKvState<N> kvState = (InternalKvState<N>) state;
+		final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
 
 		lastName = stateDescriptor.getName();
 		lastState = kvState;

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 3e76423..df762b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -28,18 +28,19 @@ import org.apache.flink.runtime.state.internal.InternalMergingState;
 import java.util.Collection;
 
 /**
- * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
- * that is stored on the heap.
+ * Base class for {@link MergingState} ({@link InternalMergingState}) that is stored on the heap.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
+ * @param <IN> The type of the input elements.
  * @param <SV> The type of the values in the state.
+ * @param <OUT> The type of the output elements.
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends State, SD extends StateDescriptor<S, ?>>
+public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State, SD extends StateDescriptor<S, SV>>
 		extends AbstractHeapState<K, N, SV, S, SD>
-		implements InternalMergingState<N, IN, OUT> {
+		implements InternalMergingState<K, N, IN, SV, OUT> {
 
 	/**
 	 * The merge transformation function that implements the merge logic.

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 7f629ae..e889c53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -37,8 +37,7 @@ import org.apache.flink.util.Preconditions;
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
-		implements InternalKvState<N> {
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements InternalKvState<K, N, SV> {
 
 	/** Map containing the actual key/value pairs. */
 	protected final StateTable<K, N, SV> stateTable;
@@ -86,28 +85,26 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	}
 
 	@Override
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<SV> safeValueSerializer) throws Exception {
 
-		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
-				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
-
-		return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1);
-	}
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
 
-	public byte[] getSerializedValue(K key, N namespace) throws Exception {
-		Preconditions.checkState(namespace != null, "No namespace given.");
-		Preconditions.checkState(key != null, "No key given.");
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
-		SV result = stateTable.get(key, namespace);
+		SV result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
 
 		if (result == null) {
 			return null;
 		}
-
-		@SuppressWarnings("unchecked,rawtypes")
-		TypeSerializer serializer = stateDesc.getSerializer();
-		return KvStateSerializer.serializeValue(result, serializer);
+		return KvStateSerializer.serializeValue(result, safeValueSerializer);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 6dd5cec..8e58ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -39,8 +39,8 @@ import java.io.IOException;
  * @param <OUT> The type of the value returned from the state.
  */
 public class HeapAggregatingState<K, N, IN, ACC, OUT>
-		extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
-		implements InternalAggregatingState<N, IN, OUT> {
+		extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
+		implements InternalAggregatingState<K, N, IN, ACC, OUT> {
 
 	private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
 
@@ -64,6 +64,21 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 		this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<ACC> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index 3a77cca..ed1d0de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * Heap-backed partitioned {@link FoldingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link FoldingState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
@@ -42,7 +41,7 @@ import java.io.IOException;
 @Deprecated
 public class HeapFoldingState<K, N, T, ACC>
 		extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
-		implements InternalFoldingState<N, T, ACC> {
+		implements InternalFoldingState<K, N, T, ACC> {
 
 	/** The function used to fold the state */
 	private final FoldTransformation<T, ACC> foldTransformation;
@@ -63,6 +62,21 @@ public class HeapFoldingState<K, N, T, ACC>
 		this.foldTransformation = new FoldTransformation<>(stateDesc);
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<ACC> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 63eb33b..82f883c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -92,8 +92,7 @@ import java.util.stream.Stream;
 
 /**
  * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
- * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
- * checkpointing.
+ * streams provided by a {@link CheckpointStreamFactory} upon checkpointing.
  *
  * @param <K> The key by which state is keyed.
  */
@@ -247,7 +246,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, V> InternalValueState<N, V> createValueState(
+	public <N, V> InternalValueState<K, N, V> createValueState(
 			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<V> stateDesc) throws Exception {
 
@@ -256,7 +255,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T> InternalListState<N, T> createListState(
+	public <N, T> InternalListState<K, N, T> createListState(
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
@@ -265,7 +264,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T> InternalReducingState<N, T> createReducingState(
+	public <N, T> InternalReducingState<K, N, T> createReducingState(
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<T> stateDesc) throws Exception {
 
@@ -274,7 +273,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+	public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
 			TypeSerializer<N> namespaceSerializer,
 			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
 
@@ -283,7 +282,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+	public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
@@ -292,7 +291,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
+	protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+			TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 
 		StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f7b5cd2..bd68560 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -21,7 +21,10 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
@@ -30,16 +33,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
- * into files.
+ * Heap-backed partitioned {@link ListState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-		extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
-		implements InternalListState<N, V> {
+		extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>, ListStateDescriptor<V>>
+		implements InternalListState<K, N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -56,6 +58,21 @@ public class HeapListState<K, N, V>
 		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<List<V>> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------
@@ -82,24 +99,34 @@ public class HeapListState<K, N, V>
 	}
 
 	@Override
-	public byte[] getSerializedValue(K key, N namespace) throws Exception {
-		Preconditions.checkState(namespace != null, "No namespace given.");
-		Preconditions.checkState(key != null, "No key given.");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<List<V>> safeValueSerializer) throws Exception {
+
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
+
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
-		List<V> result = stateTable.get(key, namespace);
+		List<V> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
 
 		if (result == null) {
 			return null;
 		}
 
-		TypeSerializer<V> serializer = stateDesc.getElementSerializer();
+		final TypeSerializer<V> dupSerializer = ((ListSerializer<V>) safeValueSerializer).getElementSerializer();
 
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
 
 		// write the same as RocksDB writes lists, with one ',' separator
 		for (int i = 0; i < result.size(); i++) {
-			serializer.serialize(result.get(i), view);
+			dupSerializer.serialize(result.get(i), view);
 			if (i < result.size() -1) {
 				view.writeByte(',');
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 206f10a..7c18071 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,11 +21,12 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.runtime.state.HashMapSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -33,14 +34,14 @@ import java.util.Map;
 /**
  * Heap-backed partitioned {@link MapState} that is snapshotted into files.
  *
- * @param <K>  The type of the key.
- * @param <N>  The type of the namespace.
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
  * @param <UK> The type of the keys in the state.
  * @param <UV> The type of the values in the state.
  */
 public class HeapMapState<K, N, UK, UV>
 		extends AbstractHeapState<K, N, HashMap<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
-		implements InternalMapState<N, UK, UV> {
+		implements InternalMapState<K, N, UK, UV, HashMap<UK, UV>> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -58,6 +59,24 @@ public class HeapMapState<K, N, UK, UV>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<HashMap<UK, UV>> getValueSerializer() {
+		return new HashMapSerializer<>(
+				stateDesc.getKeySerializer(),
+				stateDesc.getValueSerializer()
+		);
+	}
+
+	@Override
 	public UV get(UK userKey) {
 
 		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
@@ -140,19 +159,31 @@ public class HeapMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public byte[] getSerializedValue(K key, N namespace) throws IOException {
-		Preconditions.checkState(namespace != null, "No namespace given.");
-		Preconditions.checkState(key != null, "No key given.");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<HashMap<UK, UV>> safeValueSerializer) throws Exception {
 
-		HashMap<UK, UV> result = stateTable.get(key, namespace);
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
 
-		if (null == result) {
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
+
+		Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
+
+		if (result == null) {
 			return null;
 		}
 
-		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
-		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
+		final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) safeValueSerializer;
+
+		final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
+		final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
 
-		return KvStateSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer);
+		return KvStateSerializer.serializeMap(result.entrySet(), dupUserKeySerializer, dupUserValueSerializer);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 6e11327..58b3128 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link ReducingState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
@@ -38,7 +37,7 @@ import java.io.IOException;
  */
 public class HeapReducingState<K, N, V>
 		extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
-		implements InternalReducingState<N, V> {
+		implements InternalReducingState<K, N, V> {
 
 	private final ReduceTransformation<V> reduceTransformation;
 
@@ -59,6 +58,21 @@ public class HeapReducingState<K, N, V>
 		this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<V> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index 6de62a8..bf0a3cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 
 /**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted
- * into files.
+ * Heap-backed partitioned {@link ValueState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
@@ -33,7 +32,7 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
  */
 public class HeapValueState<K, N, V>
 		extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
-		implements InternalValueState<N, V> {
+		implements InternalValueState<K, N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -51,6 +50,21 @@ public class HeapValueState<K, N, V>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<V> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
+	@Override
 	public V value() {
 		final V result = stateTable.get(currentNamespace);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
index 15a8e31..b66404c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
@@ -24,10 +24,11 @@ import org.apache.flink.api.common.state.AggregatingState;
  * The peer to the {@link AggregatingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
- * @param <N>   The type of the namespace
- * @param <IN>  Type of the value added to the state.
- * @param <OUT> Type of the value extracted from the state.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> Type of the value added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> Type of the value extracted from the state
  */
-public interface InternalAggregatingState<N, IN, OUT> 
-		extends InternalMergingState<N, IN, OUT>, AggregatingState<IN, OUT> {}
+public interface InternalAggregatingState<K, N, IN, SV, OUT> extends InternalMergingState<K, N, IN, SV, OUT>, AggregatingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
index ae9f457..3cb84af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.common.state.AppendingState;
  * The peer to the {@link AppendingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
- * @param <N>   The type of the namespace
- * @param <IN>  The type of elements added to the state
- * @param <OUT> The type of the 
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> The type of the resulting element in the state
  */
-public interface InternalAppendingState<N, IN, OUT> extends InternalKvState<N>, AppendingState<IN, OUT> {}
+public interface InternalAppendingState<K, N, IN, SV, OUT> extends InternalKvState<K, N, SV>, AppendingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
index 4ef258f..ed53d82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.state.FoldingState;
  * The peer to the {@link FoldingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> Type of the values folded into the state
  * @param <ACC> Type of the value in the state
@@ -32,4 +33,4 @@ import org.apache.flink.api.common.state.FoldingState;
  * @deprecated will be removed in a future version
  */
 @Deprecated
-public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}
+public interface InternalFoldingState<K, N, T, ACC> extends InternalAppendingState<K, N, T, ACC, ACC>, FoldingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
index 06f64b6..1310dd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.internal;
 
 import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
@@ -52,10 +53,27 @@ import org.apache.flink.api.common.state.State;
  *                  |                |
  *                  +---------InternalReducingState
  * </pre>
- * 
- * @param <N> The type of the namespace.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <V> The type of values kept internally in state
  */
-public interface InternalKvState<N> extends State {
+public interface InternalKvState<K, N, V> extends State {
+
+	/**
+	 * Returns the {@link TypeSerializer} for the type of key this state is associated to.
+	 */
+	TypeSerializer<K> getKeySerializer();
+
+	/**
+	 * Returns the {@link TypeSerializer} for the type of namespace this state is associated to.
+	 */
+	TypeSerializer<N> getNamespaceSerializer();
+
+	/**
+	 * Returns the {@link TypeSerializer} for the type of value this state holds.
+	 */
+	TypeSerializer<V> getValueSerializer();
 
 	/**
 	 * Sets the current namespace, which will be used when using the state access methods.
@@ -70,10 +88,21 @@ public interface InternalKvState<N> extends State {
 	 * <p>If no value is associated with key and namespace, <code>null</code>
 	 * is returned.
 	 *
+	 * <p><b>TO IMPLEMENTERS:</b> This method is called by multiple threads. Anything
+	 * stateful (e.g. serializers) should be either duplicated or protected from undesired
+	 * consequences of concurrent invocations.
+	 *
 	 * @param serializedKeyAndNamespace Serialized key and namespace
+	 * @param safeKeySerializer A key serializer which is safe to be used even in multi-threaded context
+	 * @param safeNamespaceSerializer A namespace serializer which is safe to be used even in multi-threaded context
+	 * @param safeValueSerializer A value serializer which is safe to be used even in multi-threaded context
 	 * @return Serialized value or <code>null</code> if no value is associated with the key and namespace.
 	 * 
 	 * @throws Exception Exceptions during serialization are forwarded
 	 */
-	byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
+	byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<V> safeValueSerializer) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index 1e22dc6..1d6653b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -26,11 +26,13 @@ import java.util.List;
  * The peer to the {@link ListState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> The type of elements in the list
  */
-public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {
+public interface InternalListState<K, N, T> extends InternalMergingState<K, N, T, List<T>, Iterable<T>>, ListState<T> {
+
 	/**
 	 * Updates the operator state accessible by {@link #get()} by updating existing values to
 	 * to the given list of values. The next time {@link #get()} is called (for the same state

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
index f2a7b41..91f698c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.state.internal;
 
 import org.apache.flink.api.common.state.MapState;
 
+import java.util.Map;
+
 /**
  * The peer to the {@link MapState} in the internal state type hierarchy.
  *
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
  *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <UK> Type of the values folded into the state
  * @param <UV> Type of the value in the state
  */
-public interface InternalMapState<N, UK, UV> extends InternalKvState<N>, MapState<UK, UV> {}
+public interface InternalMapState<K, N, UK, UV, ST extends Map<UK, UV>> extends InternalKvState<K, N, ST>, MapState<UK, UV> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
index abc7d7c..2c72697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
@@ -26,12 +26,14 @@ import java.util.Collection;
  * The peer to the {@link MergingState} in the internal state type hierarchy.
  * 
  * See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
- * @param <N>   The type of the namespace
- * @param <IN>  The type of elements added to the state
- * @param <OUT> The type of elements 
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> The type of elements
  */
-public interface InternalMergingState<N, IN, OUT> extends InternalAppendingState<N, IN, OUT>, MergingState<IN, OUT> {
+public interface InternalMergingState<K, N, IN, SV, OUT> extends InternalAppendingState<K, N, IN, SV, OUT>, MergingState<IN, OUT> {
 
 	/**
 	 * Merges the state of the current key for the given source namespaces into the state of

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
index 76fa58f..f7bff2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ReducingState;
  * The peer to the {@link ReducingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> The type of elements in the aggregated by the ReduceFunction
  */
-public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {}
+public interface InternalReducingState<K, N, T> extends InternalMergingState<K, N, T, T, T>, ReducingState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
index 7177b8a..169cdba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ValueState;
  * The peer to the {@link ValueState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> The type of elements in the list
  */
-public interface InternalValueState<N, T> extends InternalKvState<N>, ValueState<T> {}
+public interface InternalValueState<K, N, T> extends InternalKvState<K, N, T>, ValueState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index 43aa1d1..36a85d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -19,27 +19,126 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link KvStateRegistry}.
  */
 public class KvStateRegistryTest extends TestLogger {
 
+	@Test
+	public void testKvStateEntry() throws InterruptedException {
+		final int threads = 10;
+
+		final CountDownLatch latch1 = new CountDownLatch(threads);
+		final CountDownLatch latch2 = new CountDownLatch(1);
+
+		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
+
+		final JobID jobID = new JobID();
+
+		final JobVertexID jobVertexId = new JobVertexID();
+		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+		final String registrationName = "foobar";
+
+		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+		final KvStateID stateID = kvStateRegistry.registerKvState(
+				jobID,
+				jobVertexId,
+				keyGroupRange,
+				registrationName,
+				new DummyKvState()
+		);
+
+		final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
+
+		for (int i = 0; i < threads; i++) {
+			new Thread(() -> {
+				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
+				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
+				infos.add(stateInfo);
+
+				latch1.countDown();
+				try {
+					latch2.await();
+				} catch (InterruptedException e) {
+					// compare and set, so that we do not overwrite an exception
+					// that was (potentially) already encountered.
+					exceptionHolder.compareAndSet(null, e);
+				}
+
+			}).start();
+		}
+
+		latch1.await();
+
+		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
+
+		// verify that all the threads are done correctly.
+		Assert.assertEquals(threads, infos.size());
+		Assert.assertEquals(threads, kvState.getCacheSize());
+
+		latch2.countDown();
+
+		for (KvStateInfo<?, ?, ?> infoA: infos) {
+			boolean instanceAlreadyFound = false;
+			for (KvStateInfo<?, ?, ?> infoB: infos) {
+				if (infoA == infoB) {
+					if (instanceAlreadyFound) {
+						Assert.fail("More than one thread sharing the same serializer instance.");
+					}
+					instanceAlreadyFound = true;
+				} else {
+					Assert.assertEquals(infoA, infoB);
+				}
+			}
+		}
+
+		kvStateRegistry.unregisterKvState(
+				jobID,
+				jobVertexId,
+				keyGroupRange,
+				registrationName,
+				stateID);
+
+		Assert.assertEquals(0L, kvState.getCacheSize());
+
+		Throwable t = exceptionHolder.get();
+		if (t != null) {
+			fail(t.getMessage());
+		}
+	}
+
 	/**
 	 * Tests that {@link KvStateRegistryListener} only receive the notifications which
 	 * are destined for them.
@@ -74,7 +173,7 @@ public class KvStateRegistryTest extends TestLogger {
 			jobVertexId,
 			keyGroupRange,
 			registrationName,
-			new DummyKvState<>());
+			new DummyKvState());
 
 		assertThat(registeredNotifications1.poll(), equalTo(jobId1));
 		assertThat(registeredNotifications2.isEmpty(), is(true));
@@ -87,7 +186,7 @@ public class KvStateRegistryTest extends TestLogger {
 			jobVertexId2,
 			keyGroupRange2,
 			registrationName2,
-			new DummyKvState<>());
+			new DummyKvState());
 
 		assertThat(registeredNotifications2.poll(), equalTo(jobId2));
 		assertThat(registeredNotifications1.isEmpty(), is(true));
@@ -191,18 +290,35 @@ public class KvStateRegistryTest extends TestLogger {
 
 	/**
 	 * Testing implementation of {@link InternalKvState}.
-	 *
-	 * @param <T> type of the state
 	 */
-	private static final class DummyKvState<T> implements InternalKvState<T> {
+	private static class DummyKvState implements InternalKvState<Integer, VoidNamespace, String> {
 
 		@Override
-		public void setCurrentNamespace(Object namespace) {
+		public TypeSerializer<Integer> getKeySerializer() {
+			return IntSerializer.INSTANCE;
+		}
+
+		@Override
+		public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
+			return VoidNamespaceSerializer.INSTANCE;
+		}
+
+		@Override
+		public TypeSerializer<String> getValueSerializer() {
+			return new DeepCopyingStringSerializer();
+		}
+
+		@Override
+		public void setCurrentNamespace(VoidNamespace namespace) {
 			// noop
 		}
 
 		@Override
-		public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
+		public byte[] getSerializedValue(
+				final byte[] serializedKeyAndNamespace,
+				final TypeSerializer<Integer> safeKeySerializer,
+				final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
+				final TypeSerializer<String> safeValueSerializer) throws Exception {
 			return serializedKeyAndNamespace;
 		}
 
@@ -212,4 +328,86 @@ public class KvStateRegistryTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * A dummy serializer that just returns another instance when .duplicate().
+	 */
+	private static class DeepCopyingStringSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -3744051158625555607L;
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return new DeepCopyingStringSerializer();
+		}
+
+		@Override
+		public String createInstance() {
+			return null;
+		}
+
+		@Override
+		public String copy(String from) {
+			return null;
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return null;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return null;
+		}
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return null;
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof DeepCopyingStringSerializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return null;
+		}
+
+		@Override
+		public CompatibilityResult<String> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			return null;
+		}
+	}
 }


[2/5] flink git commit: [FLINK-8928] [QS] Improve server binding error message.

Posted by kk...@apache.org.
[FLINK-8928] [QS] Improve server binding error message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a8172a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a8172a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a8172a9

Branch: refs/heads/master
Commit: 6a8172a9c654589faa01f1fccb2dec5e008fe532
Parents: eea887b
Author: kkloudas <kk...@gmail.com>
Authored: Mon Mar 12 12:12:06 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:33:07 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/queryablestate/network/AbstractServerBase.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6a8172a9/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index d5afeb3..2e67b98 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -194,7 +194,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 		if (serverAddress != null) {
 			log.info("Started {} @ {}.", serverName, serverAddress);
 		} else {
-			log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+			log.info("Unable to start {}. All ports in provided range ({}) are occupied.", serverName, bindPortRange);
 			throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
 		}
 	}


[3/5] flink git commit: [FLINK-8926] [QS] Shutdown client proxy after test ends.

Posted by kk...@apache.org.
[FLINK-8926] [QS] Shutdown client proxy after test ends.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c16e2c9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c16e2c9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c16e2c9b

Branch: refs/heads/master
Commit: c16e2c9b23fe0f287041a3eff51c7ee221da0dc5
Parents: 6a8172a
Author: kkloudas <kk...@gmail.com>
Authored: Mon Mar 12 13:19:23 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:33:20 2018 +0200

----------------------------------------------------------------------
 .../client/proxy/KvStateClientProxyImplTest.java               | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c16e2c9b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
index fc8b8da..cd33255 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,6 +54,11 @@ public class KvStateClientProxyImplTest extends TestLogger {
 			new DisabledKvStateRequestStats());
 	}
 
+	@After
+	public void shutdown() {
+		kvStateClientProxy.shutdown();
+	}
+
 	/**
 	 * Tests that we can set and retrieve the {@link KvStateLocationOracle}.
 	 */