You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/18 12:42:25 UTC

[1/2] flink git commit: [FLINK-9571] Repace StateBinder with internal backend-specific state factories

Repository: flink
Updated Branches:
  refs/heads/master 0e9b066aa -> 0bdde8377


http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 21a211e..aac5240 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -41,7 +43,6 @@ import java.util.List;
 class HeapListState<K, N, V>
 	extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>>
 	implements InternalListState<K, N, V> {
-
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
@@ -51,7 +52,7 @@ class HeapListState<K, N, V>
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param defaultValue The default value for the state.
 	 */
-	HeapListState(
+	private HeapListState(
 		StateTable<K, N, List<V>> stateTable,
 		TypeSerializer<K> keySerializer,
 		TypeSerializer<List<V>> valueSerializer,
@@ -183,4 +184,17 @@ class HeapListState<K, N, V>
 			});
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <E, K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapListState<>(
+			(StateTable<K, N, List<E>>) stateTable,
+			keySerializer,
+			(TypeSerializer<List<E>>) stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			(List<E>) stateDesc.getDefaultValue());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 34e5f8e..745e7f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -51,7 +53,7 @@ class HeapMapState<K, N, UK, UV>
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param defaultValue The default value for the state.
 	 */
-	HeapMapState(
+	private HeapMapState(
 		StateTable<K, N, Map<UK, UV>> stateTable,
 		TypeSerializer<K> keySerializer,
 		TypeSerializer<Map<UK, UV>> valueSerializer,
@@ -85,7 +87,7 @@ class HeapMapState<K, N, UK, UV>
 		if (userMap == null) {
 			return null;
 		}
-		
+
 		return userMap.get(userKey);
 	}
 
@@ -140,7 +142,7 @@ class HeapMapState<K, N, UK, UV>
 		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.entrySet();
 	}
-	
+
 	@Override
 	public Iterable<UK> keys() {
 		Map<UK, UV> userMap = stateTable.get(currentNamespace);
@@ -187,4 +189,17 @@ class HeapMapState<K, N, UK, UV>
 
 		return KvStateSerializer.serializeMap(result.entrySet(), dupUserKeySerializer, dupUserValueSerializer);
 	}
+
+	@SuppressWarnings("unchecked")
+	static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapMapState<>(
+			(StateTable<K, N, Map<UK, UV>>) stateTable,
+			keySerializer,
+			(TypeSerializer<Map<UK, UV>>) stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			(Map<UK, UV>) stateDesc.getDefaultValue());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index b9cee8d..7e9bff0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -50,13 +53,13 @@ class HeapReducingState<K, N, V>
 	 * @param defaultValue The default value for the state.
 	 * @param reduceFunction The reduce function used for reducing state.
 	 */
-	public HeapReducingState(
-			StateTable<K, N, V> stateTable,
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<V> valueSerializer,
-			TypeSerializer<N> namespaceSerializer,
-			V defaultValue,
-			ReduceFunction<V> reduceFunction) {
+	private HeapReducingState(
+		StateTable<K, N, V> stateTable,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<V> valueSerializer,
+		TypeSerializer<N> namespaceSerializer,
+		V defaultValue,
+		ReduceFunction<V> reduceFunction) {
 
 		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
 		this.reduceTransformation = new ReduceTransformation<>(reduceFunction);
@@ -123,4 +126,18 @@ class HeapReducingState<K, N, V>
 			return previousState != null ? reduceFunction.reduce(previousState, value) : value;
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapReducingState<>(
+			stateTable,
+			keySerializer,
+			stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			stateDesc.getDefaultValue(),
+			((ReducingStateDescriptor<SV>) stateDesc).getReduceFunction());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index 9e0687f..55b9c32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalValueState;
@@ -42,7 +44,7 @@ class HeapValueState<K, N, V>
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param defaultValue The default value for the state.
 	 */
-	HeapValueState(
+	private HeapValueState(
 		StateTable<K, N, V> stateTable,
 		TypeSerializer<K> keySerializer,
 		TypeSerializer<V> valueSerializer,
@@ -87,4 +89,17 @@ class HeapValueState<K, N, V>
 
 		stateTable.put(currentNamespace, value);
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapValueState<>(
+			stateTable,
+			keySerializer,
+			stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			stateDesc.getDefaultValue());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 37e2714..ad67171 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -3545,7 +3545,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 
 			// insert some data to the backend.
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
 				VoidNamespaceSerializer.INSTANCE,
 				new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3602,7 +3602,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		try {
 			backend = createKeyedBackend(IntSerializer.INSTANCE);
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3649,7 +3649,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		try {
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle);
 
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3791,7 +3791,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				return;
 			}
 
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index e646c64..3c06b71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -120,9 +120,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		try {
 
 			InternalValueState<String, VoidNamespace, String> state =
-				stateBackend.createValueState(
-					new VoidNamespaceSerializer(),
-					stateDescriptor);
+				stateBackend.createState(new VoidNamespaceSerializer(), stateDescriptor);
 
 			stateBackend.setCurrentKey("A");
 			state.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -163,7 +161,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 
 			stateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
-			InternalValueState<String, VoidNamespace, String> state = stateBackend.createValueState(
+			InternalValueState<String, VoidNamespace, String> state = stateBackend.createState(
 				new VoidNamespaceSerializer(),
 				stateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 345cd4f..249d0c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -73,7 +73,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 
 			keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot()));
 
-			InternalMapState<String, Integer, Long, Long> state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr);
+			InternalMapState<String, Integer, Long, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
 
 			keyedBackend.setCurrentKey("abc");
 			state.setCurrentNamespace(namespace1);
@@ -209,7 +209,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 	/**
 	 * [FLINK-5979]
 	 *
-	 * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
+	 * <p>This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
 	 * the backwards compatibility of the serialization format of {@link StateTable}s.
 	 */
 	@Test
@@ -233,7 +233,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 			final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
 			stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
 
-			InternalListState<String, Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+			InternalListState<String, Integer, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
 
 			assertEquals(7, keyedBackend.numStateEntries());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 76f59d1..ceae3e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -20,9 +20,14 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -39,9 +44,9 @@ import java.util.Collection;
  * @param <ACC> The type of the value stored in the state (the accumulator type)
  * @param <R> The type of the value returned from the state
  */
-public class RocksDBAggregatingState<K, N, T, ACC, R>
-		extends AbstractRocksDBAppendingState<K, N, T, ACC, R, AggregatingState<T, R>>
-		implements InternalAggregatingState<K, N, T, ACC, R> {
+class RocksDBAggregatingState<K, N, T, ACC, R>
+	extends AbstractRocksDBAppendingState<K, N, T, ACC, R, AggregatingState<T, R>>
+	implements InternalAggregatingState<K, N, T, ACC, R> {
 
 	/** User-specified aggregation function. */
 	private final AggregateFunction<T, ACC, R> aggFunction;
@@ -56,7 +61,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 	 * @param aggFunction The aggregate function used for aggregating state.
 	 * @param backend The backend for which this state is bind to.
 	 */
-	public RocksDBAggregatingState(
+	private RocksDBAggregatingState(
 			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<ACC> valueSerializer,
@@ -168,4 +173,18 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 			throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		RocksDBKeyedStateBackend<K> backend) {
+		return (IS) new RocksDBAggregatingState<>(
+			registerResult.f0,
+			registerResult.f1.getNamespaceSerializer(),
+			registerResult.f1.getStateSerializer(),
+			stateDesc.getDefaultValue(),
+			((AggregatingStateDescriptor<?, SV, ?>) stateDesc).getAggregateFunction(),
+			backend);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index e2ef32b..d814c26 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -20,7 +20,12 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -36,9 +41,9 @@ import org.rocksdb.ColumnFamilyHandle;
  * @deprecated will be removed in a future version
  */
 @Deprecated
-public class RocksDBFoldingState<K, N, T, ACC>
-		extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC, FoldingState<T, ACC>>
-		implements InternalFoldingState<K, N, T, ACC> {
+class RocksDBFoldingState<K, N, T, ACC>
+	extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC, FoldingState<T, ACC>>
+	implements InternalFoldingState<K, N, T, ACC> {
 
 	/** User-specified fold function. */
 	private final FoldFunction<T, ACC> foldFunction;
@@ -53,7 +58,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	 * @param foldFunction The fold function used for folding state.
 	 * @param backend The backend for which this state is bind to.
 	 */
-	public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
+	private RocksDBFoldingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<ACC> valueSerializer,
 			ACC defaultValue,
@@ -93,4 +98,18 @@ public class RocksDBFoldingState<K, N, T, ACC>
 		accumulator = foldFunction.fold(accumulator, value);
 		updateInternal(key, accumulator);
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		RocksDBKeyedStateBackend<K> backend) {
+		return (IS) new RocksDBFoldingState<>(
+			registerResult.f0,
+			registerResult.f1.getNamespaceSerializer(),
+			registerResult.f1.getStateSerializer(),
+			stateDesc.getDefaultValue(),
+			((FoldingStateDescriptor<?, SV>) stateDesc).getFoldFunction(),
+			backend);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f997f8d..e5f443a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
@@ -76,12 +77,6 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -133,6 +128,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -156,6 +152,23 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** File suffix of sstable files. */
 	private static final String SST_FILE_SUFFIX = ".sst";
 
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) RocksDBValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) RocksDBListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) RocksDBMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) RocksDBAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) RocksDBReducingState::create),
+			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) RocksDBFoldingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private interface StateFactory {
+		<K, N, SV, S extends State, IS extends S> IS createState(
+			StateDescriptor<S, SV> stateDesc,
+			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+			RocksDBKeyedStateBackend<K> backend) throws Exception;
+	}
+
 	/** String that identifies the operator that owns this backend. */
 	private final String operatorIdentifier;
 
@@ -1303,103 +1316,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, T> InternalValueState<K, N, T> createValueState(
-		TypeSerializer<N> namespaceSerializer,
-		ValueStateDescriptor<T> stateDesc) throws Exception {
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
-				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
-		return new RocksDBValueState<>(
-				registerResult.f0,
-				registerResult.f1.getNamespaceSerializer(),
-				registerResult.f1.getStateSerializer(),
-				stateDesc.getDefaultValue(),
-				this);
-	}
-
-	@Override
-	protected <N, T> InternalListState<K, N, T> createListState(
-		TypeSerializer<N> namespaceSerializer,
-		ListStateDescriptor<T> stateDesc) throws Exception {
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
-				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
-		return new RocksDBListState<>(
-				registerResult.f0,
-				registerResult.f1.getNamespaceSerializer(),
-				registerResult.f1.getStateSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getElementSerializer(),
-				this);
-	}
-
-	@Override
-	protected <N, T> InternalReducingState<K, N, T> createReducingState(
-		TypeSerializer<N> namespaceSerializer,
-		ReducingStateDescriptor<T> stateDesc) throws Exception {
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
-				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
-		return new RocksDBReducingState<>(
-				registerResult.f0,
-				registerResult.f1.getNamespaceSerializer(),
-				registerResult.f1.getStateSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getReduceFunction(),
-				this);
-	}
-
-	@Override
-	protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
-		TypeSerializer<N> namespaceSerializer,
-		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
-				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
-		return new RocksDBAggregatingState<>(
-				registerResult.f0,
-				registerResult.f1.getNamespaceSerializer(),
-				registerResult.f1.getStateSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getAggregateFunction(),
-				this);
-	}
-
-	@Override
-	protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
+	public <N, SV, S extends State, IS extends S> IS createState(
 		TypeSerializer<N> namespaceSerializer,
-		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
-				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
-		return new RocksDBFoldingState<>(
-				registerResult.f0,
-				registerResult.f1.getNamespaceSerializer(),
-				registerResult.f1.getStateSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getFoldFunction(),
-				this);
-	}
-
-	@Override
-	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
-		TypeSerializer<N> namespaceSerializer,
-		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
-				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
-		return new RocksDBMapState<>(
-				registerResult.f0,
-				registerResult.f1.getNamespaceSerializer(),
-				registerResult.f1.getStateSerializer(),
-				stateDesc.getDefaultValue(),
-				this);
+		StateDescriptor<S, SV> stateDesc) throws Exception {
+		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDesc.getClass(), this.getClass());
+			throw new FlinkRuntimeException(message);
+		}
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
+			tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
+		return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 3b27316..03faa44 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -19,9 +19,14 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -46,9 +51,9 @@ import java.util.List;
  * @param <N> The type of the namespace.
  * @param <V> The type of the values in the list state.
  */
-public class RocksDBListState<K, N, V>
-		extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
-		implements InternalListState<K, N, V> {
+class RocksDBListState<K, N, V>
+	extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
+	implements InternalListState<K, N, V> {
 
 	/** Serializer for the values. */
 	private final TypeSerializer<V> elementSerializer;
@@ -68,7 +73,7 @@ public class RocksDBListState<K, N, V>
 	 * @param elementSerializer The serializer for elements of the list state.
 	 * @param backend The backend for which this state is bind to.
 	 */
-	public RocksDBListState(
+	private RocksDBListState(
 			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<List<V>> valueSerializer,
@@ -183,7 +188,12 @@ public class RocksDBListState<K, N, V>
 	}
 
 	@Override
-	public void update(List<V> values) {
+	public void update(List<V> valueToStore) {
+		updateInternal(valueToStore);
+	}
+
+	@Override
+	public void updateInternal(List<V> values) {
 		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
 
 		clear();
@@ -244,8 +254,17 @@ public class RocksDBListState<K, N, V>
 		return keySerializationStream.toByteArray();
 	}
 
-	@Override
-	public void updateInternal(List<V> valueToStore) {
-		update(valueToStore);
+	@SuppressWarnings("unchecked")
+	static <E, K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		RocksDBKeyedStateBackend<K> backend) {
+		return (IS) new RocksDBListState<>(
+			registerResult.f0,
+			registerResult.f1.getNamespaceSerializer(),
+			(TypeSerializer<List<E>>) registerResult.f1.getStateSerializer(),
+			(List<E>) stateDesc.getDefaultValue(),
+			((ListStateDescriptor<E>) stateDesc).getElementSerializer(),
+			backend);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 65d81ac..9d00a67 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -19,6 +19,8 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +30,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -39,6 +42,7 @@ import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
@@ -58,9 +62,9 @@ import java.util.Map;
  * @param <UK> The type of the keys in the map state.
  * @param <UV> The type of the values in the map state.
  */
-public class RocksDBMapState<K, N, UK, UV>
-		extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
-		implements InternalMapState<K, N, UK, UV> {
+class RocksDBMapState<K, N, UK, UV>
+	extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
+	implements InternalMapState<K, N, UK, UV> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
 
@@ -77,7 +81,7 @@ public class RocksDBMapState<K, N, UK, UV>
 	 * @param defaultValue The default value for the state.
 	 * @param backend The backend for which this state is bind to.
 	 */
-	public RocksDBMapState(
+	private RocksDBMapState(
 			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<Map<UK, UV>> valueSerializer,
@@ -160,60 +164,45 @@ public class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException {
+	public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
 		final Iterator<Map.Entry<UK, UV>> iterator = iterator();
 
 		// Return null to make the behavior consistent with other states.
 		if (!iterator.hasNext()) {
 			return null;
 		} else {
-			return new Iterable<Map.Entry<UK, UV>>() {
-				@Override
-				public Iterator<Map.Entry<UK, UV>> iterator() {
-					return iterator;
-				}
-			};
+			return () -> iterator;
 		}
 	}
 
 	@Override
-	public Iterable<UK> keys() throws IOException, RocksDBException {
+	public Iterable<UK> keys() throws IOException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
 
-		return new Iterable<UK>() {
+		return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
 			@Override
-			public Iterator<UK> iterator() {
-				return new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
-					@Override
-					public UK next() {
-						RocksDBMapEntry entry = nextEntry();
-						return (entry == null ? null : entry.getKey());
-					}
-				};
+			public UK next() {
+				RocksDBMapEntry entry = nextEntry();
+				return (entry == null ? null : entry.getKey());
 			}
 		};
 	}
 
 	@Override
-	public Iterable<UV> values() throws IOException, RocksDBException {
+	public Iterable<UV> values() throws IOException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
 
-		return new Iterable<UV>() {
+		return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
 			@Override
-			public Iterator<UV> iterator() {
-				return new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
-					@Override
-					public UV next() {
-						RocksDBMapEntry entry = nextEntry();
-						return (entry == null ? null : entry.getValue());
-					}
-				};
+			public UV next() {
+				RocksDBMapEntry entry = nextEntry();
+				return (entry == null ? null : entry.getValue());
 			}
 		};
 	}
 
 	@Override
-	public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException {
+	public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
 
 		return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
@@ -305,12 +294,7 @@ public class RocksDBMapState<K, N, UK, UV>
 			return null;
 		}
 
-		return KvStateSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() {
-			@Override
-			public Iterator<Map.Entry<UK, UV>> iterator() {
-				return iterator;
-			}
-		}, dupUserKeySerializer, dupUserValueSerializer);
+		return KvStateSerializer.serializeMap(() -> iterator, dupUserKeySerializer, dupUserValueSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -416,7 +400,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		RocksDBMapEntry(
 				@Nonnull final RocksDB db,
-				@Nonnull final int userKeyOffset,
+				@Nonnegative final int userKeyOffset,
 				@Nonnull final byte[] rawKeyBytes,
 				@Nonnull final byte[] rawValueBytes,
 				@Nonnull final TypeSerializer<UK> keySerializer,
@@ -628,4 +612,17 @@ public class RocksDBMapState<K, N, UK, UV>
 			}
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		RocksDBKeyedStateBackend<K> backend) {
+		return (IS) new RocksDBMapState<>(
+			registerResult.f0,
+			registerResult.f1.getNamespaceSerializer(),
+			(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),
+			(Map<UK, UV>) stateDesc.getDefaultValue(),
+			backend);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index ead40b2..d138045 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -20,9 +20,14 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -38,9 +43,9 @@ import java.util.Collection;
  * @param <N> The type of the namespace.
  * @param <V> The type of value that the state state stores.
  */
-public class RocksDBReducingState<K, N, V>
-		extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
-		implements InternalReducingState<K, N, V> {
+class RocksDBReducingState<K, N, V>
+	extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
+	implements InternalReducingState<K, N, V> {
 
 	/** User-specified reduce function. */
 	private final ReduceFunction<V> reduceFunction;
@@ -55,7 +60,7 @@ public class RocksDBReducingState<K, N, V>
 	 * @param reduceFunction The reduce function used for reducing state.
 	 * @param backend The backend for which this state is bind to.
 	 */
-	public RocksDBReducingState(ColumnFamilyHandle columnFamily,
+	private RocksDBReducingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<V> valueSerializer,
 			V defaultValue,
@@ -163,4 +168,18 @@ public class RocksDBReducingState<K, N, V>
 			throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		RocksDBKeyedStateBackend<K> backend) {
+		return (IS) new RocksDBReducingState<>(
+			registerResult.f0,
+			registerResult.f1.getNamespaceSerializer(),
+			registerResult.f1.getStateSerializer(),
+			stateDesc.getDefaultValue(),
+			((ReducingStateDescriptor<SV>) stateDesc).getReduceFunction(),
+			backend);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 92e9a34..2b60fc1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -38,9 +42,9 @@ import java.io.IOException;
  * @param <N> The type of the namespace.
  * @param <V> The type of value that the state state stores.
  */
-public class RocksDBValueState<K, N, V>
-		extends AbstractRocksDBState<K, N, V, ValueState<V>>
-		implements InternalValueState<K, N, V> {
+class RocksDBValueState<K, N, V>
+	extends AbstractRocksDBState<K, N, V, ValueState<V>>
+	implements InternalValueState<K, N, V> {
 
 	/**
 	 * Creates a new {@code RocksDBValueState}.
@@ -51,7 +55,7 @@ public class RocksDBValueState<K, N, V>
 	 * @param defaultValue The default value for the state.
 	 * @param backend The backend for which this state is bind to.
 	 */
-	public RocksDBValueState(
+	private RocksDBValueState(
 			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<V> valueSerializer,
@@ -92,7 +96,7 @@ public class RocksDBValueState<K, N, V>
 	}
 
 	@Override
-	public void update(V value) throws IOException {
+	public void update(V value) {
 		if (value == null) {
 			clear();
 			return;
@@ -108,4 +112,17 @@ public class RocksDBValueState<K, N, V>
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		RocksDBKeyedStateBackend<K> backend) {
+		return (IS) new RocksDBValueState<>(
+			registerResult.f0,
+			registerResult.f1.getNamespaceSerializer(),
+			registerResult.f1.getStateSerializer(),
+			stateDesc.getDefaultValue(),
+			backend);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
index 86faea3..6b079d6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBMapState;
 import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.util.TestLogger;
@@ -36,7 +35,6 @@ import org.rocksdb.WriteOptions;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Test that validates that the performance of RocksDB's WriteBatch as expected.
@@ -63,7 +61,7 @@ import java.util.Map;
  * restoring from savepoint, because we need to set disableWAL=true to avoid segfault bug, see FLINK-8859 for detail).
  *
  * <p>Write gives user 1.5x performance improvements when disableWAL is true, this is useful for batch writing scenario,
- * e.g. {@link RocksDBMapState#putAll(Map)} & {@link RocksDBMapState#clear()}.
+ * e.g. RocksDBMapState.putAll(Map) & RocksDBMapState.clear().
  */
 public class RocksDBWriteBatchPerformanceTest extends TestLogger {
 


[2/2] flink git commit: [FLINK-9571] Repace StateBinder with internal backend-specific state factories

Posted by sr...@apache.org.
[FLINK-9571] Repace StateBinder with internal backend-specific state factories

This closes #6173.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bdde837
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bdde837
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bdde837

Branch: refs/heads/master
Commit: 0bdde8377c254195fe94709d639bf03f9bd77606
Parents: 0e9b066
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Wed Jun 13 10:24:10 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jun 18 14:41:50 2018 +0200

----------------------------------------------------------------------
 .../state/AggregatingStateDescriptor.java       |   7 -
 .../common/state/FoldingStateDescriptor.java    |   7 -
 .../api/common/state/ListStateDescriptor.java   |   7 -
 .../api/common/state/MapStateDescriptor.java    |   5 -
 .../common/state/ReducingStateDescriptor.java   |   7 -
 .../flink/api/common/state/StateBinder.java     |  85 ----------
 .../flink/api/common/state/StateDescriptor.java |  10 +-
 .../api/common/state/ValueStateDescriptor.java  |   7 -
 .../api/common/state/StateDescriptorTest.java   |  10 --
 .../client/QueryableStateClient.java            |  57 ++++++-
 .../client/state/ImmutableAggregatingState.java |  22 +--
 .../client/state/ImmutableFoldingState.java     |  18 +-
 .../client/state/ImmutableListState.java        |  28 ++--
 .../client/state/ImmutableMapState.java         |  31 ++--
 .../client/state/ImmutableReducingState.java    |  18 +-
 .../client/state/ImmutableStateBinder.java      |  80 ---------
 .../client/state/ImmutableValueState.java       |  18 +-
 .../state/ImmutableAggregatingStateTest.java    |   7 +-
 .../client/state/ImmutableFoldingStateTest.java |   7 +-
 .../client/state/ImmutableListStateTest.java    |   9 +-
 .../client/state/ImmutableMapStateTest.java     |  19 ++-
 .../state/ImmutableReducingStateTest.java       |   7 +-
 .../client/state/ImmutableValueStateTest.java   |   8 +-
 .../KVStateRequestSerializerRocksDBTest.java    |  50 +-----
 .../network/KvStateRequestSerializerTest.java   |   6 +-
 .../state/AbstractKeyedStateBackend.java        | 164 +++----------------
 .../state/heap/HeapAggregatingState.java        |  24 ++-
 .../runtime/state/heap/HeapFoldingState.java    |  20 ++-
 .../state/heap/HeapKeyedStateBackend.java       | 125 ++++----------
 .../flink/runtime/state/heap/HeapListState.java |  18 +-
 .../flink/runtime/state/heap/HeapMapState.java  |  21 ++-
 .../runtime/state/heap/HeapReducingState.java   |  31 +++-
 .../runtime/state/heap/HeapValueState.java      |  17 +-
 .../runtime/state/StateBackendTestBase.java     |   8 +-
 .../state/StateSnapshotCompressionTest.java     |   6 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |   6 +-
 .../state/RocksDBAggregatingState.java          |  27 ++-
 .../streaming/state/RocksDBFoldingState.java    |  27 ++-
 .../state/RocksDBKeyedStateBackend.java         | 132 ++++-----------
 .../streaming/state/RocksDBListState.java       |  35 +++-
 .../streaming/state/RocksDBMapState.java        |  75 ++++-----
 .../streaming/state/RocksDBReducingState.java   |  27 ++-
 .../streaming/state/RocksDBValueState.java      |  27 ++-
 .../RocksDBWriteBatchPerformanceTest.java       |   4 +-
 44 files changed, 510 insertions(+), 814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 8c7fed6..1197ed2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -93,13 +93,6 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
 		this.aggFunction = checkNotNull(aggFunction);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createAggregatingState(this);
-	}
-
 	/**
 	 * Returns the aggregate function to be used for the state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index c14e4bf..392c04c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -97,13 +97,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 		}
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createFoldingState(this);
-	}
-
 	/**
 	 * Returns the fold function to be used for the folding state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index aa5e64b..0016c22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -76,13 +76,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 		super(name, new ListSerializer<>(typeSerializer), null);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ListState<T> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createListState(this);
-	}
-
 	/**
 	 * Gets the serializer for the elements contained in the list.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 42b016a..6eb8ddc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -81,11 +81,6 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
 	}
 
 	@Override
-	public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createMapState(this);
-	}
-
-	@Override
 	public Type getType() {
 		return Type.MAP;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 0df1c2c..07b22c9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -83,13 +83,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 		this.reduceFunction = checkNotNull(reduceFunction);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createReducingState(this);
-	}
-
 	/**
 	 * Returns the reduce function to be used for the reducing state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
deleted file mode 100644
index 871b4a8..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
- * {@link State} objects.
- */
-@Internal
-public interface StateBinder {
-
-	/**
-	 * Creates and returns a new {@link ValueState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the value that the {@code ValueState} can store.
-	 */
-	<T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ListState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	<T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ReducingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ReducingState} can store.
-	 */
-	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link AggregatingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <IN> The type of the values that go into the aggregating state
-	 * @param <ACC> The type of the values that are stored in the aggregating state
-	 * @param <OUT> The type of the values that come out of the aggregating state
-	 */
-	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
-			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link FoldingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state
-	 *
-	 * @deprecated will be removed in a future version in favor of {@link AggregatingState}
-	 */
-	@Deprecated
-	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link MapState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <MK> Type of the keys in the state
-	 * @param <MV> Type of the values in the state
-	 */
-	<MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 9b6b51d..6c54e71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -41,8 +41,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
- * {@link State} in stateful operations. This contains the name and can create an actual state
- * object given a {@link StateBinder} using {@link #bind(StateBinder)}.
+ * {@link State} in stateful operations.
  *
  * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
  *
@@ -231,13 +230,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 		return queryableStateName != null;
 	}
 
-	/**
-	 * Creates a new {@link State} on the given {@link StateBinder}.
-	 *
-	 * @param stateBinder The {@code StateBackend} on which to create the {@link State}.
-	 */
-	public abstract S bind(StateBinder stateBinder) throws Exception;
-
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 4d69d81..d2719aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -122,13 +122,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 		super(name, typeSerializer, null);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ValueState<T> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createValueState(this);
-	}
-
 	@Override
 	public Type getType() {
 		return Type.VALUE;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index 3958baa..4346163 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -221,11 +221,6 @@ public class StateDescriptorTest {
 		}
 
 		@Override
-		public State bind(StateBinder stateBinder) throws Exception {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
 		public Type getType() {
 			return Type.VALUE;
 		}
@@ -248,11 +243,6 @@ public class StateDescriptorTest {
 		}
 
 		@Override
-		public State bind(StateBinder stateBinder) throws Exception {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
 		public Type getType() {
 			return Type.VALUE;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 2a6baf0..470c7ac 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -21,13 +21,25 @@ package org.apache.flink.queryablestate.client;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.queryablestate.FutureUtils;
-import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
+import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
+import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
+import org.apache.flink.queryablestate.client.state.ImmutableListState;
+import org.apache.flink.queryablestate.client.state.ImmutableMapState;
+import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
+import org.apache.flink.queryablestate.client.state.ImmutableValueState;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.queryablestate.messages.KvStateRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
@@ -44,7 +56,10 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Client for querying Flink's managed state.
@@ -67,6 +82,20 @@ public class QueryableStateClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
 
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) ImmutableValueState::createState),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) ImmutableListState::createState),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) ImmutableMapState::createState),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) ImmutableAggregatingState::createState),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) ImmutableReducingState::createState),
+			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) ImmutableFoldingState::createState)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private interface StateFactory {
+		<T, S extends State> S createState(StateDescriptor<S, T> stateDesc, byte[] serializedState) throws Exception;
+	}
+
 	/** The client that forwards the requests to the proxy. */
 	private final Client<KvStateRequest, KvStateResponse> client;
 
@@ -241,14 +270,24 @@ public class QueryableStateClient {
 			return FutureUtils.getFailedFuture(e);
 		}
 
-		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
-				stateResponse -> {
-					try {
-						return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
-					} catch (Exception e) {
-						throw new FlinkRuntimeException(e);
-					}
-				});
+		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
+			.thenApply(stateResponse -> createState(stateResponse, stateDescriptor));
+	}
+
+	private <T, S extends State> S createState(
+		KvStateResponse stateResponse,
+		StateDescriptor<S, T> stateDescriptor) {
+		StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDescriptor.getClass(), this.getClass());
+			throw new FlinkRuntimeException(message);
+		}
+		try {
+			return stateFactory.createState(stateDescriptor, stateResponse.getContent());
+		} catch (Exception e) {
+			throw new FlinkRuntimeException(e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
index 8964fbf..a83da54 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link AggregatingStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {
 
 	private final OUT value;
@@ -57,15 +57,15 @@ public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState imp
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
-			final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
-			final byte[] serializedValue) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <OUT, ACC, S extends State> S createState(
+		StateDescriptor<S, ACC> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final ACC accumulator = KvStateSerializer.deserializeValue(
-				serializedValue,
-				stateDescriptor.getSerializer());
-
-		final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
-		return new ImmutableAggregatingState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		final OUT state = ((AggregatingStateDescriptor<?, ACC, OUT>) stateDescriptor).
+			getAggregateFunction().getResult(accumulator);
+		return (S) new ImmutableAggregatingState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
index 25f3118..16a94c6 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link FoldingStateDescriptor}.
  */
-@PublicEvolving
 @Deprecated
 public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {
 
@@ -58,13 +58,13 @@ public final class ImmutableFoldingState<IN, ACC> extends ImmutableState impleme
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
-			final FoldingStateDescriptor<IN, ACC> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <ACC, S extends State> S createState(
+		StateDescriptor<S, ACC> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final ACC state = KvStateSerializer.deserializeValue(
-				serializedState,
-				stateDescriptor.getSerializer());
-		return new ImmutableFoldingState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		return (S) new ImmutableFoldingState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 9f1465e..0c86ecf 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -34,7 +35,6 @@ import java.util.List;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link ListStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableListState<V> extends ImmutableState implements ListState<V> {
 
 	private final List<V> listState;
@@ -58,23 +58,23 @@ public final class ImmutableListState<V> extends ImmutableState implements ListS
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <V> ImmutableListState<V> createState(
-			final ListStateDescriptor<V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
-		final List<V> state = KvStateSerializer.deserializeList(
-				serializedState,
-				stateDescriptor.getElementSerializer());
-		return new ImmutableListState<>(state);
-	}
-
 	@Override
-	public void update(List<V> values) throws Exception {
+	public void update(List<V> values) {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
 	@Override
-	public void addAll(List<V> values) throws Exception {
+	public void addAll(List<V> values) {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
+
+	@SuppressWarnings("unchecked")
+	public static <V, T, S extends State> S createState(
+		StateDescriptor<S, T> stateDescriptor,
+		byte[] serializedState) throws IOException {
+		final List<V> state = KvStateSerializer.deserializeList(
+			serializedState,
+			((ListStateDescriptor<V>) stateDescriptor).getElementSerializer());
+		return (S) new ImmutableListState<>(state);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index bb08cf0..4d51b7d 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -38,7 +39,6 @@ import java.util.Set;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link MapStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> {
 
 	private final Map<K, V> state;
@@ -76,8 +76,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}.
 	 *
 	 * @return A read-only iterable view of all the key-value pairs in the state.
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	@Override
 	public Iterable<Map.Entry<K, V>> entries() {
@@ -88,8 +86,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}.
 	 *
 	 * @return A read-only iterable view of all the keys in the state.
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	@Override
 	public Iterable<K> keys() {
@@ -100,8 +96,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}.
 	 *
 	 * @return A read-only iterable view of all the values in the state.
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	@Override
 	public Iterable<V> values() {
@@ -112,9 +106,7 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Iterates over all the mappings in the state. The iterator cannot
 	 * remove elements.
 	 *
-	 * @return A read-only iterator over all the mappings in the state
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
+	 * @return A read-only iterator over all the mappings in the state.
 	 */
 	@Override
 	public Iterator<Map.Entry<K, V>> iterator() {
@@ -126,14 +118,15 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <K, V> ImmutableMapState<K, V> createState(
-			final MapStateDescriptor<K, V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <K, V, T, S extends State> S createState(
+		StateDescriptor<S, T> stateDescriptor,
+		byte[] serializedState) throws IOException {
+		MapStateDescriptor<K, V> mapStateDescriptor = (MapStateDescriptor<K, V>) stateDescriptor;
 		final Map<K, V> state = KvStateSerializer.deserializeMap(
-				serializedState,
-				stateDescriptor.getKeySerializer(),
-				stateDescriptor.getValueSerializer());
-		return new ImmutableMapState<>(state);
+			serializedState,
+			mapStateDescriptor.getKeySerializer(),
+			mapStateDescriptor.getValueSerializer());
+		return (S) new ImmutableMapState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
index 46b477f..a9990b0 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link ReducingStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> {
 
 	private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableReducingState<V> extends ImmutableState implements R
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <V> ImmutableReducingState<V> createState(
-			final ReducingStateDescriptor<V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <V, S extends State> S createState(
+		StateDescriptor<S, V> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final V state = KvStateSerializer.deserializeValue(
-				serializedState,
-				stateDescriptor.getSerializer());
-		return new ImmutableReducingState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		return (S) new ImmutableReducingState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
deleted file mode 100644
index 6ce2787..0000000
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateBinder;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link StateBinder} used to deserialize the results returned by the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
- *
- * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State}
- * object containing the requested result.
- */
-public class ImmutableStateBinder implements StateBinder {
-
-	private final byte[] serializedState;
-
-	public ImmutableStateBinder(final byte[] content) {
-		serializedState = Preconditions.checkNotNull(content);
-	}
-
-	@Override
-	public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
-		return ImmutableValueState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
-		return ImmutableListState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
-		return ImmutableReducingState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception {
-		return ImmutableAggregatingState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-		return ImmutableFoldingState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception {
-		return ImmutableMapState.createState(stateDesc, serializedState);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
index f3ddd2b..3a5cab4 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link ValueStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> {
 
 	private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableValueState<V> extends ImmutableState implements Valu
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <V> ImmutableValueState<V> createState(
-			final ValueStateDescriptor<V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <V, S extends State> S createState(
+		StateDescriptor<S, V> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final V state = KvStateSerializer.deserializeValue(
-				serializedState,
-				stateDescriptor.getSerializer());
-		return new ImmutableValueState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		return (S) new ImmutableValueState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
index ebbc896..955bf38 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
@@ -41,7 +42,7 @@ public class ImmutableAggregatingStateTest {
 					new SumAggr(),
 					String.class);
 
-	private ImmutableAggregatingState<Long, String> aggrState;
+	private AggregatingState<Long, String> aggrState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableAggregatingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		String value = aggrState.get();
 		assertEquals("42", value);
 
@@ -69,7 +70,7 @@ public class ImmutableAggregatingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		String value = aggrState.get();
 		assertEquals("42", value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
index 9e8dfc9..2803d23 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -43,7 +44,7 @@ public class ImmutableFoldingStateTest {
 					new SumFold(),
 					StringSerializer.INSTANCE);
 
-	private ImmutableFoldingState<Long, String> foldingState;
+	private FoldingState<Long, String> foldingState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableFoldingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		String value = foldingState.get();
 		assertEquals("42", value);
 
@@ -69,7 +70,7 @@ public class ImmutableFoldingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		String value = foldingState.get();
 		assertEquals("42", value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
index a78ed1f..19b8514 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,7 +43,7 @@ public class ImmutableListStateTest {
 	private final ListStateDescriptor<Long> listStateDesc =
 			new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableListState<Long> listState;
+	private ListState<Long> listState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -58,7 +59,7 @@ public class ImmutableListStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		List<Long> list = getStateContents();
 		assertEquals(1L, list.size());
 
@@ -69,7 +70,7 @@ public class ImmutableListStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		List<Long> list = getStateContents();
 		assertEquals(1L, list.size());
 
@@ -100,7 +101,7 @@ public class ImmutableListStateTest {
 		return baos.toByteArray();
 	}
 
-	private List<Long> getStateContents() {
+	private List<Long> getStateContents() throws Exception {
 		List<Long> list = new ArrayList<>();
 		for (Long elem: listState.get()) {
 			list.add(elem);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
index ffeabae..6465257 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -44,7 +45,7 @@ public class ImmutableMapStateTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableMapState<Long, Long> mapState;
+	private MapState<Long, Long> mapState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -65,7 +66,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testPut() {
+	public void testPut() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -78,7 +79,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testPutAll() {
+	public void testPutAll() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -95,7 +96,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -108,7 +109,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testIterator() {
+	public void testIterator() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -124,7 +125,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testIterable() {
+	public void testIterable() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -142,7 +143,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testKeys() {
+	public void testKeys() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -158,7 +159,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testValues() {
+	public void testValues() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -174,7 +175,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
index 9694f55..543f714 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 
@@ -38,7 +39,7 @@ public class ImmutableReducingStateTest {
 	private final ReducingStateDescriptor<Long> reducingStateDesc =
 			new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableReducingState<Long> reduceState;
+	private ReducingState<Long> reduceState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -53,7 +54,7 @@ public class ImmutableReducingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		long value = reduceState.get();
 		assertEquals(42L, value);
 
@@ -61,7 +62,7 @@ public class ImmutableReducingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		long value = reduceState.get();
 		assertEquals(42L, value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
index a0da43d..75f8f03 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertEquals;
@@ -37,7 +39,7 @@ public class ImmutableValueStateTest {
 	private final ValueStateDescriptor<Long> valueStateDesc =
 			new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableValueState<Long> valueState;
+	private ValueState<Long> valueState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -52,7 +54,7 @@ public class ImmutableValueStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws IOException {
 		long value = valueState.value();
 		assertEquals(42L, value);
 
@@ -60,7 +62,7 @@ public class ImmutableValueStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws IOException {
 		long value = valueState.value();
 		assertEquals(42L, value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 6ee7631..a49fdd2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.queryablestate.network;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.contrib.streaming.state.PredefinedOptions;
@@ -40,8 +39,6 @@ import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
-import java.io.File;
-
 import static org.mockito.Mockito.mock;
 
 /**
@@ -54,43 +51,6 @@ public final class KVStateRequestSerializerRocksDBTest {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	/**
-	 * Extension of {@link RocksDBKeyedStateBackend} to make {@link
-	 * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
-	 * the tests.
-	 *
-	 * @param <K> key type
-	 */
-	static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
-
-		RocksDBKeyedStateBackend2(
-				final String operatorIdentifier,
-				final ClassLoader userCodeClassLoader,
-				final File instanceBasePath,
-				final DBOptions dbOptions,
-				final ColumnFamilyOptions columnFamilyOptions,
-				final TaskKvStateRegistry kvStateRegistry,
-				final TypeSerializer<K> keySerializer,
-				final int numberOfKeyGroups,
-				final KeyGroupRange keyGroupRange,
-				final ExecutionConfig executionConfig) throws Exception {
-
-			super(operatorIdentifier, userCodeClassLoader,
-				instanceBasePath,
-				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
-				numberOfKeyGroups, keyGroupRange, executionConfig, false,
-				TestLocalRecoveryConfig.disabled());
-		}
-
-		@Override
-		public <N, T> InternalListState<K, N, T> createListState(
-			final TypeSerializer<N> namespaceSerializer,
-			final ListStateDescriptor<T> stateDesc) throws Exception {
-
-			return super.createListState(namespaceSerializer, stateDesc);
-		}
-	}
-
-	/**
 	 * Tests list serialization and deserialization match.
 	 *
 	 * @see KvStateRequestSerializerTest#testListSerialization()
@@ -105,8 +65,8 @@ public final class KVStateRequestSerializerRocksDBTest {
 		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
 		dbOptions.setCreateIfMissing(true);
 		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
-		final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
-			new RocksDBKeyedStateBackend2<>(
+		final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+			new RocksDBKeyedStateBackend<>(
 				"no-op",
 				ClassLoader.getSystemClassLoader(),
 				temporaryFolder.getRoot(),
@@ -115,13 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest {
 				mock(TaskKvStateRegistry.class),
 				LongSerializer.INSTANCE,
 				1, new KeyGroupRange(0, 0),
-				new ExecutionConfig()
+				new ExecutionConfig(), false,
+				TestLocalRecoveryConfig.disabled()
 			);
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend
-			.createListState(VoidNamespaceSerializer.INSTANCE,
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
 		KvStateRequestSerializerTest.testListSerialization(key, listState);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 1dc7186..2ba7507 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -200,9 +200,9 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
-				VoidNamespaceSerializer.INSTANCE,
-				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(
+			VoidNamespaceSerializer.INSTANCE,
+			new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
 		testListSerialization(key, listState);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index f873655..1690240 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -20,32 +20,13 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -73,30 +54,30 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	protected final TypeSerializer<K> keySerializer;
 
 	/** The currently active key. */
-	protected K currentKey;
+	private K currentKey;
 
-	/** The key group of the currently active key */
+	/** The key group of the currently active key. */
 	private int currentKeyGroup;
 
 	/** So that we can give out state when the user uses the same key. */
-	protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+	private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
 
-	/** For caching the last accessed partitioned state */
+	/** For caching the last accessed partitioned state. */
 	private String lastName;
 
 	@SuppressWarnings("rawtypes")
 	private InternalKvState lastState;
 
-	/** The number of key-groups aka max parallelism */
+	/** The number of key-groups aka max parallelism. */
 	protected final int numberOfKeyGroups;
 
-	/** Range of key-groups for which this backend is responsible */
+	/** Range of key-groups for which this backend is responsible. */
 	protected final KeyGroupRange keyGroupRange;
 
-	/** KvStateRegistry helper for this task */
+	/** KvStateRegistry helper for this task. */
 	protected final TaskKvStateRegistry kvStateRegistry;
 
-	/** Registry for all opened streams, so they can be closed if the task using this backend is closed */
+	/** Registry for all opened streams, so they can be closed if the task using this backend is closed. */
 	protected CloseableRegistry cancelStreamRegistry;
 
 	protected final ClassLoader userCodeClassLoader;
@@ -153,87 +134,19 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	}
 
 	/**
-	 * Creates and returns a new {@link ValueState}.
+	 * Creates and returns a new {@link State}.
 	 *
 	 * @param namespaceSerializer TypeSerializer for the state namespace.
 	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
 	 *
 	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the value that the {@code ValueState} can store.
+	 * @param <SV> The type of the stored state value.
+	 * @param <S> The type of the public API state.
+	 * @param <IS> The type of internal state.
 	 */
-	protected abstract <N, T> InternalValueState<K, N, T> createValueState(
-			TypeSerializer<N> namespaceSerializer,
-			ValueStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ListState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T> InternalListState<K, N, T> createListState(
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ReducingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link AggregatingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
-			TypeSerializer<N> namespaceSerializer,
-			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link FoldingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state
-	 *
-	 * @deprecated will be removed in a future version
-	 */
-	@Deprecated
-	protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
-			TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link MapState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <UK> Type of the keys in the state
-	 * @param <UV> Type of the values in the state	 *
-	 */
-	protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
-			TypeSerializer<N> namespaceSerializer,
-			MapStateDescriptor<UK, UV> stateDesc) throws Exception;
+	public abstract <N, SV, S extends State, IS extends S> IS createState(
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, SV> stateDesc) throws Exception;
 
 	/**
 	 * @see KeyedStateBackend
@@ -311,8 +224,6 @@ public abstract class AbstractKeyedStateBackend<K> implements
 					throw new RuntimeException(e);
 				}
 			});
-		} catch (RuntimeException e) {
-			throw e;
 		}
 	}
 
@@ -320,6 +231,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @see KeyedStateBackend
 	 */
 	@Override
+	@SuppressWarnings("unchecked")
 	public <N, S extends State, V> S getOrCreateKeyedState(
 			final TypeSerializer<N> namespaceSerializer,
 			StateDescriptor<S, V> stateDescriptor) throws Exception {
@@ -343,43 +255,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			return typedState;
 		}
 
-		// create a new blank key/value state
-		S state = stateDescriptor.bind(new StateBinder() {
-			@Override
-			public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T, ACC, R> AggregatingState<T, R> createAggregatingState(
-					AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
-			}
-
-		});
-
-		@SuppressWarnings("unchecked")
-		InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
+		InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor);
 		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
 
 		// Publish queryable state
@@ -392,14 +268,14 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
 		}
 
-		return state;
+		return (S) kvState;
 	}
 
 	/**
 	 * TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
 	 *       This method should be removed for the sake of namespaces being lazily fetched from the keyed
 	 *       state backend, or being set on the state directly.
-	 * 
+	 *
 	 * @see KeyedStateBackend
 	 */
 	@SuppressWarnings("unchecked")
@@ -445,7 +321,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	}
 
 	@VisibleForTesting
-	public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
+	StreamCompressionDecorator getKeyGroupCompressionDecorator() {
 		return keyGroupCompressionDecorator;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 97d1ad0..62aa30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
@@ -40,7 +43,6 @@ import java.io.IOException;
 class HeapAggregatingState<K, N, IN, ACC, OUT>
 	extends AbstractHeapMergingState<K, N, IN, ACC, OUT>
 	implements InternalAggregatingState<K, N, IN, ACC, OUT> {
-
 	private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
 
 	/**
@@ -53,7 +55,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
 	 * @param defaultValue The default value for the state.
 	 * @param aggregateFunction The aggregating function used for aggregating state.
 	 */
-	HeapAggregatingState(
+	private HeapAggregatingState(
 		StateTable<K, N, ACC> stateTable,
 		TypeSerializer<K> keySerializer,
 		TypeSerializer<ACC> valueSerializer,
@@ -111,7 +113,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected ACC mergeState(ACC a, ACC b) throws Exception {
+	protected ACC mergeState(ACC a, ACC b) {
 		return aggregateTransformation.aggFunction.merge(a, b);
 	}
 
@@ -124,11 +126,25 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
 		}
 
 		@Override
-		public ACC apply(ACC accumulator, IN value) throws Exception {
+		public ACC apply(ACC accumulator, IN value) {
 			if (accumulator == null) {
 				accumulator = aggFunction.createAccumulator();
 			}
 			return aggFunction.add(value, accumulator);
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <T, K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapAggregatingState<>(
+			stateTable,
+			keySerializer,
+			stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			stateDesc.getDefaultValue(),
+			((AggregatingStateDescriptor<T, SV, ?>) stateDesc).getAggregateFunction());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index d61c4c5..1bca719 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -41,7 +44,6 @@ import java.io.IOException;
 class HeapFoldingState<K, N, T, ACC>
 	extends AbstractHeapAppendingState<K, N, T, ACC, ACC>
 	implements InternalFoldingState<K, N, T, ACC> {
-
 	/** The function used to fold the state. */
 	private final FoldTransformation foldTransformation;
 
@@ -55,7 +57,7 @@ class HeapFoldingState<K, N, T, ACC>
 	 * @param defaultValue The default value for the state.
 	 * @param foldFunction The fold function used for folding state.
 	 */
-	HeapFoldingState(
+	private HeapFoldingState(
 		StateTable<K, N, ACC> stateTable,
 		TypeSerializer<K> keySerializer,
 		TypeSerializer<ACC> valueSerializer,
@@ -118,4 +120,18 @@ class HeapFoldingState<K, N, T, ACC>
 			return foldFunction.fold((previousState != null) ? previousState : getDefaultValue(), value);
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapFoldingState<>(
+			stateTable,
+			keySerializer,
+			stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			stateDesc.getDefaultValue(),
+			((FoldingStateDescriptor<SV, SV>) stateDesc).getFoldFunction());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 76479b0..82ce584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -59,12 +59,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -97,6 +92,23 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
 
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) HeapValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) HeapListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) HeapMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) HeapAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) HeapReducingState::create),
+			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private interface StateFactory {
+		<K, N, SV, S extends State, IS extends S> IS createState(
+			StateDescriptor<S, SV> stateDesc,
+			StateTable<K, N, SV> stateTable,
+			TypeSerializer<K> keySerializer) throws Exception;
+	}
+
 	/**
 	 * Map of state tables that stores all state of key/value states. We store it centrally so
 	 * that we can easily checkpoint/restore it.
@@ -110,8 +122,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Map of state names to their corresponding restored state meta info.
 	 *
-	 * <p>
-	 * TODO this map can be removed when eager-state registration is in place.
+	 * <p>TODO this map can be removed when eager-state registration is in place.
 	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
 	 */
 	private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
@@ -203,91 +214,17 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, V> InternalValueState<K, N, V> createValueState(
-			TypeSerializer<N> namespaceSerializer,
-			ValueStateDescriptor<V> stateDesc) throws Exception {
-
-		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapValueState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue());
-	}
-
-	@Override
-	public <N, T> InternalListState<K, N, T> createListState(
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<T> stateDesc) throws Exception {
-
-		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapListState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue());
-	}
-
-	@Override
-	public <N, T> InternalReducingState<K, N, T> createReducingState(
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<T> stateDesc) throws Exception {
-
-		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapReducingState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getReduceFunction());
-	}
-
-	@Override
-	public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
-			TypeSerializer<N> namespaceSerializer,
-			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
-		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapAggregatingState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getAggregateFunction());
-	}
-
-	@Override
-	public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
-			TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
-		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapFoldingState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getFoldFunction());
-	}
-
-	@Override
-	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
-			TypeSerializer<N> namespaceSerializer,
-			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
-		StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-
-		return new HeapMapState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue());
+	public <N, SV, S extends State, IS extends S> IS createState(
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, SV> stateDesc) throws Exception {
+		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDesc.getClass(), this.getClass());
+			throw new FlinkRuntimeException(message);
+		}
+		StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+		return stateFactory.createState(stateDesc, stateTable, keySerializer);
 	}
 
 	@Override
@@ -411,7 +348,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					int writtenKeyGroupIndex = inView.readInt();
 
 					try (InputStream kgCompressionInStream =
-							 streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
+							streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
 
 						DataInputViewStreamWrapper kgCompressionInView =
 							new DataInputViewStreamWrapper(kgCompressionInStream);