You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/18 12:42:26 UTC

[2/2] flink git commit: [FLINK-9571] Repace StateBinder with internal backend-specific state factories

[FLINK-9571] Repace StateBinder with internal backend-specific state factories

This closes #6173.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bdde837
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bdde837
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bdde837

Branch: refs/heads/master
Commit: 0bdde8377c254195fe94709d639bf03f9bd77606
Parents: 0e9b066
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Wed Jun 13 10:24:10 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jun 18 14:41:50 2018 +0200

----------------------------------------------------------------------
 .../state/AggregatingStateDescriptor.java       |   7 -
 .../common/state/FoldingStateDescriptor.java    |   7 -
 .../api/common/state/ListStateDescriptor.java   |   7 -
 .../api/common/state/MapStateDescriptor.java    |   5 -
 .../common/state/ReducingStateDescriptor.java   |   7 -
 .../flink/api/common/state/StateBinder.java     |  85 ----------
 .../flink/api/common/state/StateDescriptor.java |  10 +-
 .../api/common/state/ValueStateDescriptor.java  |   7 -
 .../api/common/state/StateDescriptorTest.java   |  10 --
 .../client/QueryableStateClient.java            |  57 ++++++-
 .../client/state/ImmutableAggregatingState.java |  22 +--
 .../client/state/ImmutableFoldingState.java     |  18 +-
 .../client/state/ImmutableListState.java        |  28 ++--
 .../client/state/ImmutableMapState.java         |  31 ++--
 .../client/state/ImmutableReducingState.java    |  18 +-
 .../client/state/ImmutableStateBinder.java      |  80 ---------
 .../client/state/ImmutableValueState.java       |  18 +-
 .../state/ImmutableAggregatingStateTest.java    |   7 +-
 .../client/state/ImmutableFoldingStateTest.java |   7 +-
 .../client/state/ImmutableListStateTest.java    |   9 +-
 .../client/state/ImmutableMapStateTest.java     |  19 ++-
 .../state/ImmutableReducingStateTest.java       |   7 +-
 .../client/state/ImmutableValueStateTest.java   |   8 +-
 .../KVStateRequestSerializerRocksDBTest.java    |  50 +-----
 .../network/KvStateRequestSerializerTest.java   |   6 +-
 .../state/AbstractKeyedStateBackend.java        | 164 +++----------------
 .../state/heap/HeapAggregatingState.java        |  24 ++-
 .../runtime/state/heap/HeapFoldingState.java    |  20 ++-
 .../state/heap/HeapKeyedStateBackend.java       | 125 ++++----------
 .../flink/runtime/state/heap/HeapListState.java |  18 +-
 .../flink/runtime/state/heap/HeapMapState.java  |  21 ++-
 .../runtime/state/heap/HeapReducingState.java   |  31 +++-
 .../runtime/state/heap/HeapValueState.java      |  17 +-
 .../runtime/state/StateBackendTestBase.java     |   8 +-
 .../state/StateSnapshotCompressionTest.java     |   6 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |   6 +-
 .../state/RocksDBAggregatingState.java          |  27 ++-
 .../streaming/state/RocksDBFoldingState.java    |  27 ++-
 .../state/RocksDBKeyedStateBackend.java         | 132 ++++-----------
 .../streaming/state/RocksDBListState.java       |  35 +++-
 .../streaming/state/RocksDBMapState.java        |  75 ++++-----
 .../streaming/state/RocksDBReducingState.java   |  27 ++-
 .../streaming/state/RocksDBValueState.java      |  27 ++-
 .../RocksDBWriteBatchPerformanceTest.java       |   4 +-
 44 files changed, 510 insertions(+), 814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 8c7fed6..1197ed2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -93,13 +93,6 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
 		this.aggFunction = checkNotNull(aggFunction);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createAggregatingState(this);
-	}
-
 	/**
 	 * Returns the aggregate function to be used for the state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index c14e4bf..392c04c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -97,13 +97,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 		}
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createFoldingState(this);
-	}
-
 	/**
 	 * Returns the fold function to be used for the folding state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index aa5e64b..0016c22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -76,13 +76,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 		super(name, new ListSerializer<>(typeSerializer), null);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ListState<T> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createListState(this);
-	}
-
 	/**
 	 * Gets the serializer for the elements contained in the list.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 42b016a..6eb8ddc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -81,11 +81,6 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
 	}
 
 	@Override
-	public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createMapState(this);
-	}
-
-	@Override
 	public Type getType() {
 		return Type.MAP;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 0df1c2c..07b22c9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -83,13 +83,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 		this.reduceFunction = checkNotNull(reduceFunction);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createReducingState(this);
-	}
-
 	/**
 	 * Returns the reduce function to be used for the reducing state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
deleted file mode 100644
index 871b4a8..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
- * {@link State} objects.
- */
-@Internal
-public interface StateBinder {
-
-	/**
-	 * Creates and returns a new {@link ValueState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the value that the {@code ValueState} can store.
-	 */
-	<T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ListState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	<T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ReducingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ReducingState} can store.
-	 */
-	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link AggregatingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <IN> The type of the values that go into the aggregating state
-	 * @param <ACC> The type of the values that are stored in the aggregating state
-	 * @param <OUT> The type of the values that come out of the aggregating state
-	 */
-	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
-			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link FoldingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state
-	 *
-	 * @deprecated will be removed in a future version in favor of {@link AggregatingState}
-	 */
-	@Deprecated
-	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link MapState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <MK> Type of the keys in the state
-	 * @param <MV> Type of the values in the state
-	 */
-	<MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 9b6b51d..6c54e71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -41,8 +41,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
- * {@link State} in stateful operations. This contains the name and can create an actual state
- * object given a {@link StateBinder} using {@link #bind(StateBinder)}.
+ * {@link State} in stateful operations.
  *
  * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
  *
@@ -231,13 +230,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 		return queryableStateName != null;
 	}
 
-	/**
-	 * Creates a new {@link State} on the given {@link StateBinder}.
-	 *
-	 * @param stateBinder The {@code StateBackend} on which to create the {@link State}.
-	 */
-	public abstract S bind(StateBinder stateBinder) throws Exception;
-
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 4d69d81..d2719aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -122,13 +122,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 		super(name, typeSerializer, null);
 	}
 
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ValueState<T> bind(StateBinder stateBinder) throws Exception {
-		return stateBinder.createValueState(this);
-	}
-
 	@Override
 	public Type getType() {
 		return Type.VALUE;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index 3958baa..4346163 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -221,11 +221,6 @@ public class StateDescriptorTest {
 		}
 
 		@Override
-		public State bind(StateBinder stateBinder) throws Exception {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
 		public Type getType() {
 			return Type.VALUE;
 		}
@@ -248,11 +243,6 @@ public class StateDescriptorTest {
 		}
 
 		@Override
-		public State bind(StateBinder stateBinder) throws Exception {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
 		public Type getType() {
 			return Type.VALUE;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 2a6baf0..470c7ac 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -21,13 +21,25 @@ package org.apache.flink.queryablestate.client;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.queryablestate.FutureUtils;
-import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
+import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
+import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
+import org.apache.flink.queryablestate.client.state.ImmutableListState;
+import org.apache.flink.queryablestate.client.state.ImmutableMapState;
+import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
+import org.apache.flink.queryablestate.client.state.ImmutableValueState;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.queryablestate.messages.KvStateRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
@@ -44,7 +56,10 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Client for querying Flink's managed state.
@@ -67,6 +82,20 @@ public class QueryableStateClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
 
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) ImmutableValueState::createState),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) ImmutableListState::createState),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) ImmutableMapState::createState),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) ImmutableAggregatingState::createState),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) ImmutableReducingState::createState),
+			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) ImmutableFoldingState::createState)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private interface StateFactory {
+		<T, S extends State> S createState(StateDescriptor<S, T> stateDesc, byte[] serializedState) throws Exception;
+	}
+
 	/** The client that forwards the requests to the proxy. */
 	private final Client<KvStateRequest, KvStateResponse> client;
 
@@ -241,14 +270,24 @@ public class QueryableStateClient {
 			return FutureUtils.getFailedFuture(e);
 		}
 
-		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
-				stateResponse -> {
-					try {
-						return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
-					} catch (Exception e) {
-						throw new FlinkRuntimeException(e);
-					}
-				});
+		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
+			.thenApply(stateResponse -> createState(stateResponse, stateDescriptor));
+	}
+
+	private <T, S extends State> S createState(
+		KvStateResponse stateResponse,
+		StateDescriptor<S, T> stateDescriptor) {
+		StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDescriptor.getClass(), this.getClass());
+			throw new FlinkRuntimeException(message);
+		}
+		try {
+			return stateFactory.createState(stateDescriptor, stateResponse.getContent());
+		} catch (Exception e) {
+			throw new FlinkRuntimeException(e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
index 8964fbf..a83da54 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link AggregatingStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {
 
 	private final OUT value;
@@ -57,15 +57,15 @@ public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState imp
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
-			final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
-			final byte[] serializedValue) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <OUT, ACC, S extends State> S createState(
+		StateDescriptor<S, ACC> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final ACC accumulator = KvStateSerializer.deserializeValue(
-				serializedValue,
-				stateDescriptor.getSerializer());
-
-		final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
-		return new ImmutableAggregatingState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		final OUT state = ((AggregatingStateDescriptor<?, ACC, OUT>) stateDescriptor).
+			getAggregateFunction().getResult(accumulator);
+		return (S) new ImmutableAggregatingState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
index 25f3118..16a94c6 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link FoldingStateDescriptor}.
  */
-@PublicEvolving
 @Deprecated
 public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {
 
@@ -58,13 +58,13 @@ public final class ImmutableFoldingState<IN, ACC> extends ImmutableState impleme
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
-			final FoldingStateDescriptor<IN, ACC> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <ACC, S extends State> S createState(
+		StateDescriptor<S, ACC> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final ACC state = KvStateSerializer.deserializeValue(
-				serializedState,
-				stateDescriptor.getSerializer());
-		return new ImmutableFoldingState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		return (S) new ImmutableFoldingState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 9f1465e..0c86ecf 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -34,7 +35,6 @@ import java.util.List;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link ListStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableListState<V> extends ImmutableState implements ListState<V> {
 
 	private final List<V> listState;
@@ -58,23 +58,23 @@ public final class ImmutableListState<V> extends ImmutableState implements ListS
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <V> ImmutableListState<V> createState(
-			final ListStateDescriptor<V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
-		final List<V> state = KvStateSerializer.deserializeList(
-				serializedState,
-				stateDescriptor.getElementSerializer());
-		return new ImmutableListState<>(state);
-	}
-
 	@Override
-	public void update(List<V> values) throws Exception {
+	public void update(List<V> values) {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
 	@Override
-	public void addAll(List<V> values) throws Exception {
+	public void addAll(List<V> values) {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
+
+	@SuppressWarnings("unchecked")
+	public static <V, T, S extends State> S createState(
+		StateDescriptor<S, T> stateDescriptor,
+		byte[] serializedState) throws IOException {
+		final List<V> state = KvStateSerializer.deserializeList(
+			serializedState,
+			((ListStateDescriptor<V>) stateDescriptor).getElementSerializer());
+		return (S) new ImmutableListState<>(state);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index bb08cf0..4d51b7d 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -38,7 +39,6 @@ import java.util.Set;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link MapStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> {
 
 	private final Map<K, V> state;
@@ -76,8 +76,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}.
 	 *
 	 * @return A read-only iterable view of all the key-value pairs in the state.
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	@Override
 	public Iterable<Map.Entry<K, V>> entries() {
@@ -88,8 +86,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}.
 	 *
 	 * @return A read-only iterable view of all the keys in the state.
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	@Override
 	public Iterable<K> keys() {
@@ -100,8 +96,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}.
 	 *
 	 * @return A read-only iterable view of all the values in the state.
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	@Override
 	public Iterable<V> values() {
@@ -112,9 +106,7 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	 * Iterates over all the mappings in the state. The iterator cannot
 	 * remove elements.
 	 *
-	 * @return A read-only iterator over all the mappings in the state
-	 *
-	 * @throws Exception Thrown if the system cannot access the state.
+	 * @return A read-only iterator over all the mappings in the state.
 	 */
 	@Override
 	public Iterator<Map.Entry<K, V>> iterator() {
@@ -126,14 +118,15 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <K, V> ImmutableMapState<K, V> createState(
-			final MapStateDescriptor<K, V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <K, V, T, S extends State> S createState(
+		StateDescriptor<S, T> stateDescriptor,
+		byte[] serializedState) throws IOException {
+		MapStateDescriptor<K, V> mapStateDescriptor = (MapStateDescriptor<K, V>) stateDescriptor;
 		final Map<K, V> state = KvStateSerializer.deserializeMap(
-				serializedState,
-				stateDescriptor.getKeySerializer(),
-				stateDescriptor.getValueSerializer());
-		return new ImmutableMapState<>(state);
+			serializedState,
+			mapStateDescriptor.getKeySerializer(),
+			mapStateDescriptor.getValueSerializer());
+		return (S) new ImmutableMapState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
index 46b477f..a9990b0 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link ReducingStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> {
 
 	private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableReducingState<V> extends ImmutableState implements R
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <V> ImmutableReducingState<V> createState(
-			final ReducingStateDescriptor<V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <V, S extends State> S createState(
+		StateDescriptor<S, V> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final V state = KvStateSerializer.deserializeValue(
-				serializedState,
-				stateDescriptor.getSerializer());
-		return new ImmutableReducingState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		return (S) new ImmutableReducingState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
deleted file mode 100644
index 6ce2787..0000000
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateBinder;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link StateBinder} used to deserialize the results returned by the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
- *
- * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State}
- * object containing the requested result.
- */
-public class ImmutableStateBinder implements StateBinder {
-
-	private final byte[] serializedState;
-
-	public ImmutableStateBinder(final byte[] content) {
-		serializedState = Preconditions.checkNotNull(content);
-	}
-
-	@Override
-	public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
-		return ImmutableValueState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
-		return ImmutableListState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
-		return ImmutableReducingState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception {
-		return ImmutableAggregatingState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-		return ImmutableFoldingState.createState(stateDesc, serializedState);
-	}
-
-	@Override
-	public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception {
-		return ImmutableMapState.createState(stateDesc, serializedState);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
index f3ddd2b..3a5cab4 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.queryablestate.client.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -33,7 +34,6 @@ import java.io.IOException;
  * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
  * providing an {@link ValueStateDescriptor}.
  */
-@PublicEvolving
 public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> {
 
 	private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableValueState<V> extends ImmutableState implements Valu
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
-	public static <V> ImmutableValueState<V> createState(
-			final ValueStateDescriptor<V> stateDescriptor,
-			final byte[] serializedState) throws IOException {
-
+	@SuppressWarnings("unchecked")
+	public static <V, S extends State> S createState(
+		StateDescriptor<S, V> stateDescriptor,
+		byte[] serializedState) throws IOException {
 		final V state = KvStateSerializer.deserializeValue(
-				serializedState,
-				stateDescriptor.getSerializer());
-		return new ImmutableValueState<>(state);
+			serializedState,
+			stateDescriptor.getSerializer());
+		return (S) new ImmutableValueState<>(state);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
index ebbc896..955bf38 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
@@ -41,7 +42,7 @@ public class ImmutableAggregatingStateTest {
 					new SumAggr(),
 					String.class);
 
-	private ImmutableAggregatingState<Long, String> aggrState;
+	private AggregatingState<Long, String> aggrState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableAggregatingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		String value = aggrState.get();
 		assertEquals("42", value);
 
@@ -69,7 +70,7 @@ public class ImmutableAggregatingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		String value = aggrState.get();
 		assertEquals("42", value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
index 9e8dfc9..2803d23 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -43,7 +44,7 @@ public class ImmutableFoldingStateTest {
 					new SumFold(),
 					StringSerializer.INSTANCE);
 
-	private ImmutableFoldingState<Long, String> foldingState;
+	private FoldingState<Long, String> foldingState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableFoldingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		String value = foldingState.get();
 		assertEquals("42", value);
 
@@ -69,7 +70,7 @@ public class ImmutableFoldingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		String value = foldingState.get();
 		assertEquals("42", value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
index a78ed1f..19b8514 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,7 +43,7 @@ public class ImmutableListStateTest {
 	private final ListStateDescriptor<Long> listStateDesc =
 			new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableListState<Long> listState;
+	private ListState<Long> listState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -58,7 +59,7 @@ public class ImmutableListStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		List<Long> list = getStateContents();
 		assertEquals(1L, list.size());
 
@@ -69,7 +70,7 @@ public class ImmutableListStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		List<Long> list = getStateContents();
 		assertEquals(1L, list.size());
 
@@ -100,7 +101,7 @@ public class ImmutableListStateTest {
 		return baos.toByteArray();
 	}
 
-	private List<Long> getStateContents() {
+	private List<Long> getStateContents() throws Exception {
 		List<Long> list = new ArrayList<>();
 		for (Long elem: listState.get()) {
 			list.add(elem);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
index ffeabae..6465257 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -44,7 +45,7 @@ public class ImmutableMapStateTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableMapState<Long, Long> mapState;
+	private MapState<Long, Long> mapState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -65,7 +66,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testPut() {
+	public void testPut() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -78,7 +79,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testPutAll() {
+	public void testPutAll() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -95,7 +96,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -108,7 +109,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testIterator() {
+	public void testIterator() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -124,7 +125,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testIterable() {
+	public void testIterable() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -142,7 +143,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testKeys() {
+	public void testKeys() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -158,7 +159,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testValues() {
+	public void testValues() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);
@@ -174,7 +175,7 @@ public class ImmutableMapStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		assertTrue(mapState.contains(1L));
 		long value = mapState.get(1L);
 		assertEquals(5L, value);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
index 9694f55..543f714 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 
@@ -38,7 +39,7 @@ public class ImmutableReducingStateTest {
 	private final ReducingStateDescriptor<Long> reducingStateDesc =
 			new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableReducingState<Long> reduceState;
+	private ReducingState<Long> reduceState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -53,7 +54,7 @@ public class ImmutableReducingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws Exception {
 		long value = reduceState.get();
 		assertEquals(42L, value);
 
@@ -61,7 +62,7 @@ public class ImmutableReducingStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws Exception {
 		long value = reduceState.get();
 		assertEquals(42L, value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
index a0da43d..75f8f03 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.queryablestate.client.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertEquals;
@@ -37,7 +39,7 @@ public class ImmutableValueStateTest {
 	private final ValueStateDescriptor<Long> valueStateDesc =
 			new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
 
-	private ImmutableValueState<Long> valueState;
+	private ValueState<Long> valueState;
 
 	@Before
 	public void setUp() throws Exception {
@@ -52,7 +54,7 @@ public class ImmutableValueStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
+	public void testUpdate() throws IOException {
 		long value = valueState.value();
 		assertEquals(42L, value);
 
@@ -60,7 +62,7 @@ public class ImmutableValueStateTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
+	public void testClear() throws IOException {
 		long value = valueState.value();
 		assertEquals(42L, value);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 6ee7631..a49fdd2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.queryablestate.network;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.contrib.streaming.state.PredefinedOptions;
@@ -40,8 +39,6 @@ import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
-import java.io.File;
-
 import static org.mockito.Mockito.mock;
 
 /**
@@ -54,43 +51,6 @@ public final class KVStateRequestSerializerRocksDBTest {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	/**
-	 * Extension of {@link RocksDBKeyedStateBackend} to make {@link
-	 * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
-	 * the tests.
-	 *
-	 * @param <K> key type
-	 */
-	static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
-
-		RocksDBKeyedStateBackend2(
-				final String operatorIdentifier,
-				final ClassLoader userCodeClassLoader,
-				final File instanceBasePath,
-				final DBOptions dbOptions,
-				final ColumnFamilyOptions columnFamilyOptions,
-				final TaskKvStateRegistry kvStateRegistry,
-				final TypeSerializer<K> keySerializer,
-				final int numberOfKeyGroups,
-				final KeyGroupRange keyGroupRange,
-				final ExecutionConfig executionConfig) throws Exception {
-
-			super(operatorIdentifier, userCodeClassLoader,
-				instanceBasePath,
-				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
-				numberOfKeyGroups, keyGroupRange, executionConfig, false,
-				TestLocalRecoveryConfig.disabled());
-		}
-
-		@Override
-		public <N, T> InternalListState<K, N, T> createListState(
-			final TypeSerializer<N> namespaceSerializer,
-			final ListStateDescriptor<T> stateDesc) throws Exception {
-
-			return super.createListState(namespaceSerializer, stateDesc);
-		}
-	}
-
-	/**
 	 * Tests list serialization and deserialization match.
 	 *
 	 * @see KvStateRequestSerializerTest#testListSerialization()
@@ -105,8 +65,8 @@ public final class KVStateRequestSerializerRocksDBTest {
 		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
 		dbOptions.setCreateIfMissing(true);
 		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
-		final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
-			new RocksDBKeyedStateBackend2<>(
+		final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+			new RocksDBKeyedStateBackend<>(
 				"no-op",
 				ClassLoader.getSystemClassLoader(),
 				temporaryFolder.getRoot(),
@@ -115,13 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest {
 				mock(TaskKvStateRegistry.class),
 				LongSerializer.INSTANCE,
 				1, new KeyGroupRange(0, 0),
-				new ExecutionConfig()
+				new ExecutionConfig(), false,
+				TestLocalRecoveryConfig.disabled()
 			);
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend
-			.createListState(VoidNamespaceSerializer.INSTANCE,
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
 		KvStateRequestSerializerTest.testListSerialization(key, listState);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 1dc7186..2ba7507 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -200,9 +200,9 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
-				VoidNamespaceSerializer.INSTANCE,
-				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(
+			VoidNamespaceSerializer.INSTANCE,
+			new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
 		testListSerialization(key, listState);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index f873655..1690240 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -20,32 +20,13 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -73,30 +54,30 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	protected final TypeSerializer<K> keySerializer;
 
 	/** The currently active key. */
-	protected K currentKey;
+	private K currentKey;
 
-	/** The key group of the currently active key */
+	/** The key group of the currently active key. */
 	private int currentKeyGroup;
 
 	/** So that we can give out state when the user uses the same key. */
-	protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+	private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
 
-	/** For caching the last accessed partitioned state */
+	/** For caching the last accessed partitioned state. */
 	private String lastName;
 
 	@SuppressWarnings("rawtypes")
 	private InternalKvState lastState;
 
-	/** The number of key-groups aka max parallelism */
+	/** The number of key-groups aka max parallelism. */
 	protected final int numberOfKeyGroups;
 
-	/** Range of key-groups for which this backend is responsible */
+	/** Range of key-groups for which this backend is responsible. */
 	protected final KeyGroupRange keyGroupRange;
 
-	/** KvStateRegistry helper for this task */
+	/** KvStateRegistry helper for this task. */
 	protected final TaskKvStateRegistry kvStateRegistry;
 
-	/** Registry for all opened streams, so they can be closed if the task using this backend is closed */
+	/** Registry for all opened streams, so they can be closed if the task using this backend is closed. */
 	protected CloseableRegistry cancelStreamRegistry;
 
 	protected final ClassLoader userCodeClassLoader;
@@ -153,87 +134,19 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	}
 
 	/**
-	 * Creates and returns a new {@link ValueState}.
+	 * Creates and returns a new {@link State}.
 	 *
 	 * @param namespaceSerializer TypeSerializer for the state namespace.
 	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
 	 *
 	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the value that the {@code ValueState} can store.
+	 * @param <SV> The type of the stored state value.
+	 * @param <S> The type of the public API state.
+	 * @param <IS> The type of internal state.
 	 */
-	protected abstract <N, T> InternalValueState<K, N, T> createValueState(
-			TypeSerializer<N> namespaceSerializer,
-			ValueStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ListState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T> InternalListState<K, N, T> createListState(
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ReducingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link AggregatingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
-			TypeSerializer<N> namespaceSerializer,
-			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link FoldingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state
-	 *
-	 * @deprecated will be removed in a future version
-	 */
-	@Deprecated
-	protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
-			TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link MapState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <UK> Type of the keys in the state
-	 * @param <UV> Type of the values in the state	 *
-	 */
-	protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
-			TypeSerializer<N> namespaceSerializer,
-			MapStateDescriptor<UK, UV> stateDesc) throws Exception;
+	public abstract <N, SV, S extends State, IS extends S> IS createState(
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, SV> stateDesc) throws Exception;
 
 	/**
 	 * @see KeyedStateBackend
@@ -311,8 +224,6 @@ public abstract class AbstractKeyedStateBackend<K> implements
 					throw new RuntimeException(e);
 				}
 			});
-		} catch (RuntimeException e) {
-			throw e;
 		}
 	}
 
@@ -320,6 +231,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @see KeyedStateBackend
 	 */
 	@Override
+	@SuppressWarnings("unchecked")
 	public <N, S extends State, V> S getOrCreateKeyedState(
 			final TypeSerializer<N> namespaceSerializer,
 			StateDescriptor<S, V> stateDescriptor) throws Exception {
@@ -343,43 +255,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			return typedState;
 		}
 
-		// create a new blank key/value state
-		S state = stateDescriptor.bind(new StateBinder() {
-			@Override
-			public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T, ACC, R> AggregatingState<T, R> createAggregatingState(
-					AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-				return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
-			}
-
-		});
-
-		@SuppressWarnings("unchecked")
-		InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
+		InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor);
 		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
 
 		// Publish queryable state
@@ -392,14 +268,14 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
 		}
 
-		return state;
+		return (S) kvState;
 	}
 
 	/**
 	 * TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
 	 *       This method should be removed for the sake of namespaces being lazily fetched from the keyed
 	 *       state backend, or being set on the state directly.
-	 * 
+	 *
 	 * @see KeyedStateBackend
 	 */
 	@SuppressWarnings("unchecked")
@@ -445,7 +321,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	}
 
 	@VisibleForTesting
-	public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
+	StreamCompressionDecorator getKeyGroupCompressionDecorator() {
 		return keyGroupCompressionDecorator;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 97d1ad0..62aa30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
@@ -40,7 +43,6 @@ import java.io.IOException;
 class HeapAggregatingState<K, N, IN, ACC, OUT>
 	extends AbstractHeapMergingState<K, N, IN, ACC, OUT>
 	implements InternalAggregatingState<K, N, IN, ACC, OUT> {
-
 	private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
 
 	/**
@@ -53,7 +55,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
 	 * @param defaultValue The default value for the state.
 	 * @param aggregateFunction The aggregating function used for aggregating state.
 	 */
-	HeapAggregatingState(
+	private HeapAggregatingState(
 		StateTable<K, N, ACC> stateTable,
 		TypeSerializer<K> keySerializer,
 		TypeSerializer<ACC> valueSerializer,
@@ -111,7 +113,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected ACC mergeState(ACC a, ACC b) throws Exception {
+	protected ACC mergeState(ACC a, ACC b) {
 		return aggregateTransformation.aggFunction.merge(a, b);
 	}
 
@@ -124,11 +126,25 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
 		}
 
 		@Override
-		public ACC apply(ACC accumulator, IN value) throws Exception {
+		public ACC apply(ACC accumulator, IN value) {
 			if (accumulator == null) {
 				accumulator = aggFunction.createAccumulator();
 			}
 			return aggFunction.add(value, accumulator);
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <T, K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapAggregatingState<>(
+			stateTable,
+			keySerializer,
+			stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			stateDesc.getDefaultValue(),
+			((AggregatingStateDescriptor<T, SV, ?>) stateDesc).getAggregateFunction());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index d61c4c5..1bca719 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -41,7 +44,6 @@ import java.io.IOException;
 class HeapFoldingState<K, N, T, ACC>
 	extends AbstractHeapAppendingState<K, N, T, ACC, ACC>
 	implements InternalFoldingState<K, N, T, ACC> {
-
 	/** The function used to fold the state. */
 	private final FoldTransformation foldTransformation;
 
@@ -55,7 +57,7 @@ class HeapFoldingState<K, N, T, ACC>
 	 * @param defaultValue The default value for the state.
 	 * @param foldFunction The fold function used for folding state.
 	 */
-	HeapFoldingState(
+	private HeapFoldingState(
 		StateTable<K, N, ACC> stateTable,
 		TypeSerializer<K> keySerializer,
 		TypeSerializer<ACC> valueSerializer,
@@ -118,4 +120,18 @@ class HeapFoldingState<K, N, T, ACC>
 			return foldFunction.fold((previousState != null) ? previousState : getDefaultValue(), value);
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	static <K, N, SV, S extends State, IS extends S> IS create(
+		StateDescriptor<S, SV> stateDesc,
+		StateTable<K, N, SV> stateTable,
+		TypeSerializer<K> keySerializer) {
+		return (IS) new HeapFoldingState<>(
+			stateTable,
+			keySerializer,
+			stateTable.getStateSerializer(),
+			stateTable.getNamespaceSerializer(),
+			stateDesc.getDefaultValue(),
+			((FoldingStateDescriptor<SV, SV>) stateDesc).getFoldFunction());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 76479b0..82ce584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -59,12 +59,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -97,6 +92,23 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
 
+	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (StateFactory) HeapValueState::create),
+			Tuple2.of(ListStateDescriptor.class, (StateFactory) HeapListState::create),
+			Tuple2.of(MapStateDescriptor.class, (StateFactory) HeapMapState::create),
+			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) HeapAggregatingState::create),
+			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) HeapReducingState::create),
+			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private interface StateFactory {
+		<K, N, SV, S extends State, IS extends S> IS createState(
+			StateDescriptor<S, SV> stateDesc,
+			StateTable<K, N, SV> stateTable,
+			TypeSerializer<K> keySerializer) throws Exception;
+	}
+
 	/**
 	 * Map of state tables that stores all state of key/value states. We store it centrally so
 	 * that we can easily checkpoint/restore it.
@@ -110,8 +122,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Map of state names to their corresponding restored state meta info.
 	 *
-	 * <p>
-	 * TODO this map can be removed when eager-state registration is in place.
+	 * <p>TODO this map can be removed when eager-state registration is in place.
 	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
 	 */
 	private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
@@ -203,91 +214,17 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, V> InternalValueState<K, N, V> createValueState(
-			TypeSerializer<N> namespaceSerializer,
-			ValueStateDescriptor<V> stateDesc) throws Exception {
-
-		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapValueState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue());
-	}
-
-	@Override
-	public <N, T> InternalListState<K, N, T> createListState(
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<T> stateDesc) throws Exception {
-
-		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapListState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue());
-	}
-
-	@Override
-	public <N, T> InternalReducingState<K, N, T> createReducingState(
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<T> stateDesc) throws Exception {
-
-		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapReducingState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getReduceFunction());
-	}
-
-	@Override
-	public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
-			TypeSerializer<N> namespaceSerializer,
-			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
-		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapAggregatingState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getAggregateFunction());
-	}
-
-	@Override
-	public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
-			TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
-		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapFoldingState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue(),
-				stateDesc.getFoldFunction());
-	}
-
-	@Override
-	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
-			TypeSerializer<N> namespaceSerializer,
-			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
-		StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-
-		return new HeapMapState<>(
-				stateTable,
-				keySerializer,
-				stateTable.getStateSerializer(),
-				stateTable.getNamespaceSerializer(),
-				stateDesc.getDefaultValue());
+	public <N, SV, S extends State, IS extends S> IS createState(
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, SV> stateDesc) throws Exception {
+		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDesc.getClass(), this.getClass());
+			throw new FlinkRuntimeException(message);
+		}
+		StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+		return stateFactory.createState(stateDesc, stateTable, keySerializer);
 	}
 
 	@Override
@@ -411,7 +348,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					int writtenKeyGroupIndex = inView.readInt();
 
 					try (InputStream kgCompressionInStream =
-							 streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
+							streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
 
 						DataInputViewStreamWrapper kgCompressionInView =
 							new DataInputViewStreamWrapper(kgCompressionInStream);