You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/28 09:06:27 UTC

[2/2] flink git commit: [FLINK-8715] Ensure use of reconfigured serializers in state backends

[FLINK-8715] Ensure use of reconfigured serializers in state backends

This commit consists of the following changes:
1. No longer use StateDescriptors in state handle classes. Instead,
state handles always work with serializers directly.
2. Refactor state registration paths to have better separation of
concerns.

This closes #5885.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07aa2d46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07aa2d46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07aa2d46

Branch: refs/heads/master
Commit: 07aa2d469e34884d715b01166db077a4cf7cf3af
Parents: ff62977
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Apr 20 21:15:42 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Apr 28 17:05:27 2018 +0800

----------------------------------------------------------------------
 .../api/common/typeutils/CompatibilityUtil.java |   2 +
 .../state/DefaultOperatorStateBackend.java      |   5 +-
 .../RegisteredKeyedBackendStateMetaInfo.java    |  56 ++++
 .../state/heap/AbstractHeapMergingState.java    |  23 +-
 .../runtime/state/heap/AbstractHeapState.java   |  35 ++-
 .../state/heap/HeapAggregatingState.java        |  27 +-
 .../runtime/state/heap/HeapFoldingState.java    |  36 ++-
 .../state/heap/HeapKeyedStateBackend.java       | 133 ++++----
 .../flink/runtime/state/heap/HeapListState.java |  20 +-
 .../flink/runtime/state/heap/HeapMapState.java  |  25 +-
 .../runtime/state/heap/HeapReducingState.java   |  26 +-
 .../runtime/state/heap/HeapValueState.java      |  22 +-
 .../runtime/state/StateBackendTestBase.java     | 314 ++++++++++++++++++-
 .../streaming/state/AbstractRocksDBState.java   |  31 +-
 .../state/RocksDBAggregatingState.java          |  28 +-
 .../streaming/state/RocksDBFoldingState.java    |  26 +-
 .../state/RocksDBKeyedStateBackend.java         | 168 +++++-----
 .../streaming/state/RocksDBListState.java       |  31 +-
 .../streaming/state/RocksDBMapState.java        |  22 +-
 .../streaming/state/RocksDBReducingState.java   |  24 +-
 .../streaming/state/RocksDBValueState.java      |  25 +-
 21 files changed, 753 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index df7f216..6c8583c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -30,6 +30,8 @@ public class CompatibilityUtil {
 	/**
 	 * Resolves the final compatibility result of two serializers by taking into account compound information,
 	 * including the preceding serializer, the preceding serializer's configuration snapshot, and the new serializer.
+	 * This method has the side effect that the provided new serializer may have been reconfigured in order to
+	 * remain compatible.
 	 *
 	 * The final result is determined as follows:
 	 *   1. If there is no configuration snapshot of the preceding serializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 01a397a..edbd605 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -757,16 +757,17 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 				(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredOperatorStateMetaInfos.get(name);
 
 			// check compatibility to determine if state migration is required
+			TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
 			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
 					restoredMetaInfo.getPartitionStateSerializer(),
 					UnloadableDummyTypeSerializer.class,
 					restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(),
-					partitionStateSerializer);
+					newPartitionStateSerializer);
 
 			if (!stateCompatibility.isRequiresMigration()) {
 				// new serializer is compatible; use it to replace the old serializer
 				partitionableListState.setStateMetaInfo(
-					new RegisteredOperatorBackendStateMetaInfo<>(name, partitionStateSerializer, mode));
+					new RegisteredOperatorBackendStateMetaInfo<>(name, newPartitionStateSerializer, mode));
 			} else {
 				// TODO state migration currently isn't possible.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
index 342bc7f..c7952ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
@@ -19,9 +19,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 
 import java.util.Objects;
 
@@ -240,4 +244,56 @@ public class RegisteredKeyedBackendStateMetaInfo<N, S> {
 			return result;
 		}
 	}
+
+	/**
+	 * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
+	 * This checks that the descriptor specifies identical names and state types, as well as
+	 * serializers that are compatible for the restored k/v state bytes.
+	 */
+	public static  <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
+		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredStateMetaInfoSnapshot,
+		TypeSerializer<N> newNamespaceSerializer,
+		StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
+
+		Preconditions.checkState(
+			Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
+			"Incompatible state names. " +
+				"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
+				"registered with [" + newStateDescriptor.getName() + "].");
+
+		if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
+			&& !Objects.equals(restoredStateMetaInfoSnapshot.getStateType(), StateDescriptor.Type.UNKNOWN)) {
+
+			Preconditions.checkState(
+				newStateDescriptor.getType() == restoredStateMetaInfoSnapshot.getStateType(),
+				"Incompatible state types. " +
+					"Was [" + restoredStateMetaInfoSnapshot.getStateType() + "], " +
+					"registered with [" + newStateDescriptor.getType() + "].");
+		}
+
+		// check compatibility results to determine if state migration is required
+		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+			restoredStateMetaInfoSnapshot.getNamespaceSerializer(),
+			null,
+			restoredStateMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+			newNamespaceSerializer);
+
+		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
+		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+			restoredStateMetaInfoSnapshot.getStateSerializer(),
+			UnloadableDummyTypeSerializer.class,
+			restoredStateMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+			newStateSerializer);
+
+		if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
+			// TODO state migration currently isn't possible.
+			throw new StateMigrationException("State migration isn't supported, yet.");
+		} else {
+			return new RegisteredKeyedBackendStateMetaInfo<>(
+				newStateDescriptor.getType(),
+				newStateDescriptor.getName(),
+				newNamespaceSerializer,
+				newStateSerializer);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index df762b4..2ecff46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
@@ -36,10 +35,9 @@ import java.util.Collection;
  * @param <SV> The type of the values in the state.
  * @param <OUT> The type of the output elements.
  * @param <S> The type of State
- * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State, SD extends StateDescriptor<S, SV>>
-		extends AbstractHeapState<K, N, SV, S, SD>
+public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State>
+		extends AbstractHeapState<K, N, SV, S>
 		implements InternalMergingState<K, N, IN, SV, OUT> {
 
 	/**
@@ -50,17 +48,20 @@ public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends Stat
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
 	 */
 	protected AbstractHeapMergingState(
-			SD stateDesc,
 			StateTable<K, N, SV> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
+			TypeSerializer<SV> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			SV defaultValue) {
 
-		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
 		this.mergeTransformation = new MergeTransformation();
 	}
 
@@ -106,4 +107,4 @@ public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends Stat
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index e889c53..f824e84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -35,40 +34,44 @@ import org.apache.flink.util.Preconditions;
  * @param <N> The type of the namespace.
  * @param <SV> The type of the values in the state.
  * @param <S> The type of State
- * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements InternalKvState<K, N, SV> {
+public abstract class AbstractHeapState<K, N, SV, S extends State> implements InternalKvState<K, N, SV> {
 
 	/** Map containing the actual key/value pairs. */
 	protected final StateTable<K, N, SV> stateTable;
 
-	/** This holds the name of the state and can create an initial default value for the state. */
-	protected final SD stateDesc;
-
 	/** The current namespace, which the access methods will refer to. */
 	protected N currentNamespace;
 
 	protected final TypeSerializer<K> keySerializer;
 
+	protected final TypeSerializer<SV> valueSerializer;
+
 	protected final TypeSerializer<N> namespaceSerializer;
 
+	private final SV defaultValue;
+
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
 	 */
 	protected AbstractHeapState(
-			SD stateDesc,
 			StateTable<K, N, SV> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
+			TypeSerializer<SV> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			SV defaultValue) {
 
-		this.stateDesc = stateDesc;
 		this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
 		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
 		this.namespaceSerializer = namespaceSerializer;
+		this.defaultValue = defaultValue;
 		this.currentNamespace = null;
 	}
 
@@ -114,4 +117,12 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	public StateTable<K, N, SV> getStateTable() {
 		return stateTable;
 	}
+
+	protected SV getDefaultValue() {
+		if (defaultValue != null) {
+			return valueSerializer.copy(defaultValue);
+		} else {
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 8e58ac8..84a6d57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
@@ -39,7 +38,7 @@ import java.io.IOException;
  * @param <OUT> The type of the value returned from the state.
  */
 public class HeapAggregatingState<K, N, IN, ACC, OUT>
-		extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
+		extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>>
 		implements InternalAggregatingState<K, N, IN, ACC, OUT> {
 
 	private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
@@ -47,21 +46,23 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc
-	 *             The state identifier for the state. This contains name and can create a default state value.
-	 * @param stateTable
-	 *             The state table to use in this kev/value state. May contain initial state.
-	 * @param namespaceSerializer
-	 *             The serializer for the type that indicates the namespace
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
+	 * @param aggregateFunction The aggregating function used for aggregating state.
 	 */
 	public HeapAggregatingState(
-			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
 			StateTable<K, N, ACC> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
+			TypeSerializer<ACC> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ACC defaultValue,
+			AggregateFunction<IN, ACC, OUT> aggregateFunction) {
 
-		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
-		this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
+		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+		this.aggregateTransformation = new AggregateTransformation<>(aggregateFunction);
 	}
 
 	@Override
@@ -76,7 +77,7 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 
 	@Override
 	public TypeSerializer<ACC> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index ed1d0de..3fa0a5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -40,26 +39,31 @@ import java.io.IOException;
  */
 @Deprecated
 public class HeapFoldingState<K, N, T, ACC>
-		extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+		extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>>
 		implements InternalFoldingState<K, N, T, ACC> {
 
 	/** The function used to fold the state */
-	private final FoldTransformation<T, ACC> foldTransformation;
+	private final FoldTransformation foldTransformation;
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
+	 * @param foldFunction The fold function used for folding state.
 	 */
 	public HeapFoldingState(
-			FoldingStateDescriptor<T, ACC> stateDesc,
 			StateTable<K, N, ACC> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
-		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
-		this.foldTransformation = new FoldTransformation<>(stateDesc);
+			TypeSerializer<ACC> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ACC defaultValue,
+			FoldFunction<T, ACC> foldFunction) {
+		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+		this.foldTransformation = new FoldTransformation(foldFunction);
 	}
 
 	@Override
@@ -74,7 +78,7 @@ public class HeapFoldingState<K, N, T, ACC>
 
 	@Override
 	public TypeSerializer<ACC> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	// ------------------------------------------------------------------------
@@ -101,19 +105,17 @@ public class HeapFoldingState<K, N, T, ACC>
 		}
 	}
 
-	private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
+	private final class FoldTransformation implements StateTransformationFunction<ACC, T> {
 
-		private final FoldingStateDescriptor<T, ACC> stateDescriptor;
 		private final FoldFunction<T, ACC> foldFunction;
 
-		FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
-			this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
-			this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
+		FoldTransformation(FoldFunction<T, ACC> foldFunction) {
+			this.foldFunction = Preconditions.checkNotNull(foldFunction);
 		}
 
 		@Override
 		public ACC apply(ACC previousState, T value) throws Exception {
-			return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
+			return foldFunction.fold((previousState != null) ? previousState : getDefaultValue(), value);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 10803e2..568ab3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -157,75 +156,35 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
 			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException {
 
-		return tryRegisterStateTable(
-				stateDesc.getName(), stateDesc.getType(),
-				namespaceSerializer, stateDesc.getSerializer());
-	}
+		@SuppressWarnings("unchecked")
+		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateDesc.getName());
 
-	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
-			String stateName,
-			StateDescriptor.Type stateType,
-			TypeSerializer<N> namespaceSerializer,
-			TypeSerializer<V> valueSerializer) throws StateMigrationException {
+		RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo;
+		if (stateTable != null) {
+			@SuppressWarnings("unchecked")
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfoSnapshot =
+				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateDesc.getName());
 
-		final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
-				new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+			Preconditions.checkState(
+				restoredMetaInfoSnapshot != null,
+				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
+					" but its corresponding restored snapshot cannot be found.");
 
-		@SuppressWarnings("unchecked")
-		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
+			newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+				restoredMetaInfoSnapshot,
+				namespaceSerializer,
+				stateDesc);
 
-		if (stateTable == null) {
-			stateTable = snapshotStrategy.newStateTable(newMetaInfo);
-			stateTables.put(stateName, stateTable);
+			stateTable.setMetaInfo(newMetaInfo);
 		} else {
-			// TODO with eager registration in place, these checks should be moved to restorePartitionedState()
+			newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+				stateDesc.getType(),
+				stateDesc.getName(),
+				namespaceSerializer,
+				stateDesc.getSerializer());
 
-			Preconditions.checkState(
-				stateName.equals(stateTable.getMetaInfo().getName()),
-				"Incompatible state names. " +
-					"Was [" + stateTable.getMetaInfo().getName() + "], " +
-					"registered with [" + newMetaInfo.getName() + "].");
-
-			if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
-					&& !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
-
-				Preconditions.checkState(
-					newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
-					"Incompatible state types. " +
-						"Was [" + stateTable.getMetaInfo().getStateType() + "], " +
-						"registered with [" + newMetaInfo.getStateType() + "].");
-			}
-
-			@SuppressWarnings("unchecked")
-			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo =
-				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName);
-
-			// check compatibility results to determine if state migration is required
-			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getNamespaceSerializer(),
-					null,
-					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-					newMetaInfo.getNamespaceSerializer());
-
-			CompatibilityResult<V> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getStateSerializer(),
-					UnloadableDummyTypeSerializer.class,
-					restoredMetaInfo.getStateSerializerConfigSnapshot(),
-					newMetaInfo.getStateSerializer());
-
-			if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
-				// new serializers are compatible; use them to replace the old serializers
-				stateTable.setMetaInfo(newMetaInfo);
-			} else {
-				// TODO state migration currently isn't possible.
-
-				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
-				// since the state has already been deserialized to objects and we can just continue with
-				// the new serializer; we're deliberately failing here for now to have equal functionality with
-				// the RocksDB backend to avoid confusion for users.
-
-				throw new StateMigrationException("State migration isn't supported, yet.");
-			}
+			stateTable = snapshotStrategy.newStateTable(newMetaInfo);
+			stateTables.put(stateDesc.getName(), stateTable);
 		}
 
 		return stateTable;
@@ -250,7 +209,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ValueStateDescriptor<V> stateDesc) throws Exception {
 
 		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapValueState<>(
+				stateTable,
+				keySerializer,
+				stateTable.getStateSerializer(),
+				stateTable.getNamespaceSerializer(),
+				stateDesc.getDefaultValue());
 	}
 
 	@Override
@@ -259,7 +223,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
 		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapListState<>(
+				stateTable,
+				keySerializer,
+				stateTable.getStateSerializer(),
+				stateTable.getNamespaceSerializer(),
+				stateDesc.getDefaultValue());
 	}
 
 	@Override
@@ -268,7 +237,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ReducingStateDescriptor<T> stateDesc) throws Exception {
 
 		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapReducingState<>(
+				stateTable,
+				keySerializer,
+				stateTable.getStateSerializer(),
+				stateTable.getNamespaceSerializer(),
+				stateDesc.getDefaultValue(),
+				stateDesc.getReduceFunction());
 	}
 
 	@Override
@@ -277,7 +252,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
 
 		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapAggregatingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapAggregatingState<>(
+				stateTable,
+				keySerializer,
+				stateTable.getStateSerializer(),
+				stateTable.getNamespaceSerializer(),
+				stateDesc.getDefaultValue(),
+				stateDesc.getAggregateFunction());
 	}
 
 	@Override
@@ -286,7 +267,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
 		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapFoldingState<>(
+				stateTable,
+				keySerializer,
+				stateTable.getStateSerializer(),
+				stateTable.getNamespaceSerializer(),
+				stateDesc.getDefaultValue(),
+				stateDesc.getFoldFunction());
 	}
 
 	@Override
@@ -295,7 +282,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 
 		StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+
+		return new HeapMapState<>(
+				stateTable,
+				keySerializer,
+				stateTable.getStateSerializer(),
+				stateTable.getNamespaceSerializer(),
+				stateDesc.getDefaultValue());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index bd68560..5e4e471 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -40,22 +39,25 @@ import java.util.List;
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-		extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>, ListStateDescriptor<V>>
+		extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>>
 		implements InternalListState<K, N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
 	 */
 	public HeapListState(
-			ListStateDescriptor<V> stateDesc,
 			StateTable<K, N, List<V>> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
-		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+			TypeSerializer<List<V>> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			List<V> defaultValue) {
+		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
 	}
 
 	@Override
@@ -70,7 +72,7 @@ public class HeapListState<K, N, V>
 
 	@Override
 	public TypeSerializer<List<V>> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index ccd017f..36743e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -40,22 +39,27 @@ import java.util.Map;
  * @param <UV> The type of the values in the state.
  */
 public class HeapMapState<K, N, UK, UV>
-		extends AbstractHeapState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
+		extends AbstractHeapState<K, N, Map<UK, UV>, MapState<UK, UV>>
 		implements InternalMapState<K, N, UK, UV> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc  The state identifier for the state. This contains name
-	 *                   and can create a default state value.
-	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
 	 */
 	public HeapMapState(
-			MapStateDescriptor<UK, UV> stateDesc,
 			StateTable<K, N, Map<UK, UV>> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
-		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+			TypeSerializer<Map<UK, UV>> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			Map<UK, UV> defaultValue) {
+		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+
+		Preconditions.checkState(valueSerializer instanceof MapSerializer, "Unexpected serializer type.");
 	}
 
 	@Override
@@ -70,10 +74,7 @@ public class HeapMapState<K, N, UK, UV>
 
 	@Override
 	public TypeSerializer<Map<UK, UV>> getValueSerializer() {
-		return new MapSerializer<>(
-				stateDesc.getKeySerializer(),
-				stateDesc.getValueSerializer()
-		);
+		return valueSerializer;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 58b3128..47f6279 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -36,7 +35,7 @@ import java.io.IOException;
  * @param <V> The type of the value.
  */
 public class HeapReducingState<K, N, V>
-		extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
+		extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>>
 		implements InternalReducingState<K, N, V> {
 
 	private final ReduceTransformation<V> reduceTransformation;
@@ -44,18 +43,23 @@ public class HeapReducingState<K, N, V>
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 * @param stateTable The state table to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
+	 * @param reduceFunction The reduce function used for reducing state.
 	 */
 	public HeapReducingState(
-			ReducingStateDescriptor<V> stateDesc,
 			StateTable<K, N, V> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
+			TypeSerializer<V> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			V defaultValue,
+			ReduceFunction<V> reduceFunction) {
 
-		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
-		this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
+		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+		this.reduceTransformation = new ReduceTransformation<>(reduceFunction);
 	}
 
 	@Override
@@ -70,7 +74,7 @@ public class HeapReducingState<K, N, V>
 
 	@Override
 	public TypeSerializer<V> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	// ------------------------------------------------------------------------
@@ -119,4 +123,4 @@ public class HeapReducingState<K, N, V>
 			return previousState != null ? reduceFunction.reduce(previousState, value) : value;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index bf0a3cf..95e81e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 
@@ -31,22 +30,25 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
  * @param <V> The type of the value.
  */
 public class HeapValueState<K, N, V>
-		extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
+		extends AbstractHeapState<K, N, V, ValueState<V>>
 		implements InternalValueState<K, N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table for which this state is associated to.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the state.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param defaultValue The default value for the state.
 	 */
 	public HeapValueState(
-			ValueStateDescriptor<V> stateDesc,
 			StateTable<K, N, V> stateTable,
 			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) {
-		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+			TypeSerializer<V> valueSerializer,
+			TypeSerializer<N> namespaceSerializer,
+			V defaultValue) {
+		super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
 	}
 
 	@Override
@@ -61,7 +63,7 @@ public class HeapValueState<K, N, V>
 
 	@Override
 	public TypeSerializer<V> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	@Override
@@ -69,7 +71,7 @@ public class HeapValueState<K, N, V>
 		final V result = stateTable.get(currentNamespace);
 
 		if (result == null) {
-			return stateDesc.getDefaultValue();
+			return getDefaultValue();
 		}
 
 		return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 41df1b3..5737964 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -40,7 +40,10 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -51,6 +54,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -82,6 +87,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -90,6 +96,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.PrimitiveIterator;
 import java.util.Random;
 import java.util.Timer;
@@ -759,6 +766,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		Environment env = new DummyEnvironment();
@@ -777,6 +785,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
 		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
+		// access the internal state representation to retrieve the original Kryo registration ids;
+		// these will be later used to check that on restore, the new Kryo serializer has reconfigured itself to
+		// have identical mappings
+		InternalKvState internalKvState = (InternalKvState) state;
+		KryoSerializer<TestPojo> kryoSerializer = (KryoSerializer<TestPojo>) internalKvState.getValueSerializer();
+		int mainPojoClassRegistrationId = kryoSerializer.getKryo().getRegistration(TestPojo.class).getId();
+		int nestedPojoClassARegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId();
+		int nestedPojoClassBRegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId();
+
 		// ============== create snapshot of current configuration ==============
 
 		// make some more modifications
@@ -810,6 +827,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		kvId = new ValueStateDescriptor<>("id", pojoType);
 		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
+		// verify that on restore, the serializer that the state handle uses has reconfigured itself to have
+		// identical Kryo registration ids compared to the previous execution
+		internalKvState = (InternalKvState) state;
+		kryoSerializer = (KryoSerializer<TestPojo>) internalKvState.getValueSerializer();
+		assertEquals(mainPojoClassRegistrationId, kryoSerializer.getKryo().getRegistration(TestPojo.class).getId());
+		assertEquals(nestedPojoClassARegistrationId, kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId());
+		assertEquals(nestedPojoClassBRegistrationId, kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId());
+
 		backend.setCurrentKey(1);
 
 		// update to test state backends that eagerly serialize, such as RocksDB
@@ -893,6 +918,110 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testStateSerializerReconfiguration() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		Environment env = new DummyEnvironment();
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		try {
+			ValueStateDescriptor<TestCustomStateClass> kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
+			ValueState<TestCustomStateClass> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+			// ============== create snapshot, using the non-reconfigured serializer ==============
+
+			// make some modifications
+			backend.setCurrentKey(1);
+			state.update(new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
+
+			backend.setCurrentKey(2);
+			state.update(new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
+
+			// verify that our assumption that the serializer is not yet reconfigured;
+			// we cast the state handle to the internal representation in order to retrieve the serializer
+			InternalKvState internal = (InternalKvState) state;
+			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
+			assertFalse(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
+
+			KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forCheckpointWithDefaultLocation()));
+
+			snapshot1.registerSharedStates(sharedStateRegistry);
+			backend.dispose();
+
+			// ========== restore snapshot, which should reconfigure the serializer, and then create a snapshot again ==========
+
+			env = new DummyEnvironment();
+
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1, env);
+
+			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
+			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+			// verify that the serializer used is correctly reconfigured
+			internal = (InternalKvState) state;
+			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
+			assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
+
+			backend.setCurrentKey(1);
+			TestCustomStateClass restoredState1 = state.value();
+			assertEquals("test-message-1", restoredState1.getMessage());
+			// the previous serializer schema does not contain the extra message
+			assertNull(restoredState1.getExtraMessage());
+
+			state.update(new TestCustomStateClass("new-test-message-1", "extra-message-1"));
+
+			backend.setCurrentKey(2);
+			TestCustomStateClass restoredState2 = state.value();
+			assertEquals("test-message-2", restoredState2.getMessage());
+			assertNull(restoredState1.getExtraMessage());
+
+			state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
+
+			KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
+				682375462379L,
+				3,
+				streamFactory,
+				CheckpointOptions.forCheckpointWithDefaultLocation()));
+
+			snapshot2.registerSharedStates(sharedStateRegistry);
+			snapshot1.discardState();
+			backend.dispose();
+
+			// ========== restore snapshot again; state should now be in the new schema containing the extra message ==========
+
+			env = new DummyEnvironment();
+
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+			snapshot2.discardState();
+
+			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
+			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+			internal = (InternalKvState) state;
+			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
+			assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
+
+			backend.setCurrentKey(1);
+			restoredState1 = state.value();
+			assertEquals("new-test-message-1", restoredState1.getMessage());
+			assertEquals("extra-message-1", restoredState1.getExtraMessage());
+
+			backend.setCurrentKey(2);
+			restoredState2 = state.value();
+			assertEquals("new-test-message-2", restoredState2.getMessage());
+			assertEquals("extra-message-2", restoredState2.getExtraMessage());
+		} finally {
+			backend.dispose();
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testValueState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -3074,7 +3203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.update(121818273);
 
-			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
 			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 
 		}
@@ -3096,7 +3225,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? , ?>) kvState).getStateTable();
 			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 		}
 
@@ -3123,7 +3252,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
 			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 		}
 
@@ -3150,7 +3279,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
 			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 		}
 
@@ -3896,6 +4025,183 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	/**
+	 * Custom state class used for testing state serializer schema migration.
+	 * The corresponding serializer used in the tests is {@link TestReconfigurableCustomTypeSerializer}.
+	 */
+	public static class TestCustomStateClass {
+
+		private String message;
+		private String extraMessage;
+
+		public TestCustomStateClass(String message, String extraMessage) {
+			this.message = message;
+			this.extraMessage = extraMessage;
+		}
+
+		public String getMessage() {
+			return message;
+		}
+
+		public void setMessage(String message) {
+			this.message = message;
+		}
+
+		public String getExtraMessage() {
+			return extraMessage;
+		}
+
+		public void setExtraMessage(String extraMessage) {
+			this.extraMessage = extraMessage;
+		}
+	}
+
+	/**
+	 * A reconfigurable serializer that simulates backwards compatible schema evolution for the {@link TestCustomStateClass}.
+	 * A flag is maintained to determine whether or not the serializer has be reconfigured.
+	 * Whether or not it has been reconfigured affects which fields of {@link TestCustomStateClass} instances are
+	 * written and read on serialization.
+	 */
+	public static class TestReconfigurableCustomTypeSerializer extends TypeSerializer<TestCustomStateClass> {
+
+		private boolean reconfigured = false;
+
+		public TestReconfigurableCustomTypeSerializer() {}
+
+		/** Copy constructor. */
+		private TestReconfigurableCustomTypeSerializer(boolean reconfigured) {
+			this.reconfigured = reconfigured;
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return new ParameterlessTypeSerializerConfig(getClass().getName());
+		}
+
+		@Override
+		public CompatibilityResult<TestCustomStateClass> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			if (configSnapshot instanceof ParameterlessTypeSerializerConfig &&
+					((ParameterlessTypeSerializerConfig) configSnapshot).getSerializationFormatIdentifier().equals(getClass().getName())) {
+
+				this.reconfigured = true;
+				return CompatibilityResult.compatible();
+			} else {
+				return CompatibilityResult.requiresMigration();
+			}
+		}
+
+		@Override
+		public TypeSerializer<TestCustomStateClass> duplicate() {
+			return new TestReconfigurableCustomTypeSerializer(reconfigured);
+		}
+
+		@Override
+		public TestCustomStateClass createInstance() {
+			return new TestCustomStateClass(null, null);
+		}
+
+		@Override
+		public void serialize(TestCustomStateClass record, DataOutputView target) throws IOException {
+			target.writeBoolean(reconfigured);
+
+			target.writeUTF(record.getMessage());
+			if (reconfigured) {
+				target.writeUTF(record.getExtraMessage());
+			}
+		}
+
+		@Override
+		public TestCustomStateClass deserialize(DataInputView source) throws IOException {
+			boolean isNewSchema = source.readBoolean();
+
+			String message = source.readUTF();
+			if (isNewSchema) {
+				return new TestCustomStateClass(message, source.readUTF());
+			} else {
+				return new TestCustomStateClass(message, null);
+			}
+		}
+
+		@Override
+		public TestCustomStateClass deserialize(TestCustomStateClass reuse, DataInputView source) throws IOException {
+			boolean isNewSchema = source.readBoolean();
+
+			String message = source.readUTF();
+			if (isNewSchema) {
+				reuse.setMessage(message);
+				reuse.setExtraMessage(source.readUTF());
+				return reuse;
+			} else {
+				reuse.setMessage(message);
+				reuse.setExtraMessage(null);
+				return reuse;
+			}
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			boolean reconfigured = source.readBoolean();
+
+			target.writeUTF(source.readUTF());
+			if (reconfigured)
+			target.writeUTF(source.readUTF());
+		}
+
+		@Override
+		public TestCustomStateClass copy(TestCustomStateClass from) {
+			return new TestCustomStateClass(from.getMessage(), from.getExtraMessage());
+		}
+
+		@Override
+		public TestCustomStateClass copy(TestCustomStateClass from, TestCustomStateClass reuse) {
+			reuse.setMessage(from.getMessage());
+			reuse.setExtraMessage(from.getExtraMessage());
+			return reuse;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof TestReconfigurableCustomTypeSerializer;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) {
+				return false;
+			}
+
+			if (!(obj instanceof TestReconfigurableCustomTypeSerializer)) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			} else {
+				TestReconfigurableCustomTypeSerializer other = (TestReconfigurableCustomTypeSerializer) obj;
+				return other.reconfigured == this.reconfigured;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(getClass().getName(), reconfigured);
+		}
+
+		public boolean isReconfigured() {
+			return reconfigured;
+		}
+	}
+
+	/**
 	 * We throw this in our {@link ExceptionThrowingTestSerializer}.
 	 */
 	private static class ExpectedKryoTestException extends RuntimeException {}

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 8c59979..92c9e80 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -18,7 +18,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -45,13 +44,15 @@ import java.io.IOException;
  * @param <N> The type of the namespace.
  * @param <V> The type of values kept internally in state.
  * @param <S> The type of {@link State}.
- * @param <SD> The type of {@link StateDescriptor}.
  */
-public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends StateDescriptor<S, V>> implements InternalKvState<K, N, V>, State {
+public abstract class AbstractRocksDBState<K, N, V, S extends State> implements InternalKvState<K, N, V>, State {
 
 	/** Serializer for the namespace. */
 	final TypeSerializer<N> namespaceSerializer;
 
+	/** Serializer for the state values. */
+	final TypeSerializer<V> valueSerializer;
+
 	/** The current namespace, which the next value methods will refer to. */
 	private N currentNamespace;
 
@@ -61,8 +62,7 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
 	/** The column family of this particular instance of state. */
 	protected ColumnFamilyHandle columnFamily;
 
-	/** State descriptor from which to create this state instance. */
-	protected final SD stateDesc;
+	protected final V defaultValue;
 
 	protected final WriteOptions writeOptions;
 
@@ -74,12 +74,18 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
 
 	/**
 	 * Creates a new RocksDB backed state.
-	 *  @param namespaceSerializer The serializer for the namespace.
+	 *
+	 * @param columnFamily The RocksDB column family that this state is associated to.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param valueSerializer The serializer for the state.
+	 * @param defaultValue The default value for the state.
+	 * @param backend The backend for which this state is bind to.
 	 */
 	protected AbstractRocksDBState(
 			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
-			SD stateDesc,
+			TypeSerializer<V> valueSerializer,
+			V defaultValue,
 			RocksDBKeyedStateBackend<K> backend) {
 
 		this.namespaceSerializer = namespaceSerializer;
@@ -88,7 +94,8 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
 		this.columnFamily = columnFamily;
 
 		this.writeOptions = backend.getWriteOptions();
-		this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor");
+		this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer");
+		this.defaultValue = defaultValue;
 
 		this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
 		this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
@@ -190,4 +197,12 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
 		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
 		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
 	}
+
+	protected V getDefaultValue() {
+		if (defaultValue != null) {
+			return valueSerializer.copy(defaultValue);
+		} else {
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 14671a5..f8d7980 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -43,33 +42,32 @@ import java.util.Collection;
  * @param <R> The type of the value returned from the state
  */
 public class RocksDBAggregatingState<K, N, T, ACC, R>
-		extends AbstractRocksDBState<K, N, ACC, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>>
+		extends AbstractRocksDBState<K, N, ACC, AggregatingState<T, R>>
 		implements InternalAggregatingState<K, N, T, ACC, R> {
 
-	/** Serializer for the values. */
-	private final TypeSerializer<ACC> valueSerializer;
-
 	/** User-specified aggregation function. */
 	private final AggregateFunction<T, ACC, R> aggFunction;
 
 	/**
 	 * Creates a new {@code RocksDBAggregatingState}.
 	 *
-	 * @param namespaceSerializer
-	 *             The serializer for the namespace.
-	 * @param stateDesc
-	 *             The state identifier for the state. This contains the state name and aggregation function.
+	 * @param columnFamily The RocksDB column family that this state is associated to.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param valueSerializer The serializer for the state.
+	 * @param defaultValue The default value for the state.
+	 * @param aggFunction The aggregate function used for aggregating state.
+	 * @param backend The backend for which this state is bind to.
 	 */
 	public RocksDBAggregatingState(
 			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
-			AggregatingStateDescriptor<T, ACC, R> stateDesc,
+			TypeSerializer<ACC> valueSerializer,
+			ACC defaultValue,
+			AggregateFunction<T, ACC, R> aggFunction,
 			RocksDBKeyedStateBackend<K> backend) {
 
-		super(columnFamily, namespaceSerializer, stateDesc, backend);
-
-		this.valueSerializer = stateDesc.getSerializer();
-		this.aggFunction = stateDesc.getAggregateFunction();
+		super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
+		this.aggFunction = aggFunction;
 	}
 
 	@Override
@@ -84,7 +82,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 
 	@Override
 	public TypeSerializer<ACC> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 6f5baff..4bc6bf9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -44,31 +43,32 @@ import java.io.IOException;
  */
 @Deprecated
 public class RocksDBFoldingState<K, N, T, ACC>
-		extends AbstractRocksDBState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+		extends AbstractRocksDBState<K, N, ACC, FoldingState<T, ACC>>
 		implements InternalFoldingState<K, N, T, ACC> {
 
-	/** Serializer for the values. */
-	private final TypeSerializer<ACC> valueSerializer;
-
 	/** User-specified fold function. */
 	private final FoldFunction<T, ACC> foldFunction;
 
 	/**
 	 * Creates a new {@code RocksDBFoldingState}.
 	 *
+	 * @param columnFamily The RocksDB column family that this state is associated to.
 	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                     and can create a default state value.
+	 * @param valueSerializer The serializer for the state.
+	 * @param defaultValue The default value for the state.
+	 * @param foldFunction The fold function used for folding state.
+	 * @param backend The backend for which this state is bind to.
 	 */
 	public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc,
+			TypeSerializer<ACC> valueSerializer,
+			ACC defaultValue,
+			FoldFunction<T, ACC> foldFunction,
 			RocksDBKeyedStateBackend<K> backend) {
 
-		super(columnFamily, namespaceSerializer, stateDesc, backend);
+		super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
 
-		this.valueSerializer = stateDesc.getSerializer();
-		this.foldFunction = stateDesc.getFoldFunction();
+		this.foldFunction = foldFunction;
 	}
 
 	@Override
@@ -83,7 +83,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 
 	@Override
 	public TypeSerializer<ACC> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	@Override
@@ -110,7 +110,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 			if (valueBytes == null) {
 				keySerializationStream.reset();
-				valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
+				valueSerializer.serialize(foldFunction.fold(getDefaultValue(), value), out);
 				backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 			} else {
 				ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 69ce95c..2990212 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -1116,89 +1115,71 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
-	 * we don't restore the individual k/v states, just the global RocksDB database and the
-	 * list of column families. When a k/v state is first requested we check here whether we
-	 * already have a column family for that and return it or create a new one if it doesn't exist.
+	 * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
 	 *
-	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
-	 * that we checkpointed, i.e. is already in the map of column families.
+	 * <p>When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
+	 * the list of k/v state information. When a k/v state is first requested we check here whether we
+	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
+	 * or create a new one if it does not exist.
 	 */
-	@SuppressWarnings("rawtypes, unchecked")
-	protected <N, S> ColumnFamilyHandle getColumnFamily(
-		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
+	private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
+			StateDescriptor<?, S> stateDesc,
+			TypeSerializer<N> namespaceSerializer) throws StateMigrationException, IOException {
 
 		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
-			kvStateInformation.get(descriptor.getName());
-
-		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
-			descriptor.getType(),
-			descriptor.getName(),
-			namespaceSerializer,
-			descriptor.getSerializer());
+			kvStateInformation.get(stateDesc.getName());
 
+		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo;
 		if (stateInfo != null) {
-			// TODO with eager registration in place, these checks should be moved to restore()
 
-			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
-				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
+			@SuppressWarnings("unchecked")
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfoSnapshot =
+				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(stateDesc.getName());
 
 			Preconditions.checkState(
-				Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()),
-				"Incompatible state names. " +
-					"Was [" + restoredMetaInfo.getName() + "], " +
-					"registered with [" + newMetaInfo.getName() + "].");
-
-			if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)
-				&& !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) {
-
-				Preconditions.checkState(
-					newMetaInfo.getStateType() == restoredMetaInfo.getStateType(),
-					"Incompatible state types. " +
-						"Was [" + restoredMetaInfo.getStateType() + "], " +
-						"registered with [" + newMetaInfo.getStateType() + "].");
-			}
+				restoredMetaInfoSnapshot != null,
+				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
+					" but its corresponding restored snapshot cannot be found.");
 
-			// check compatibility results to determine if state migration is required
-			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-				restoredMetaInfo.getNamespaceSerializer(),
-				null,
-				restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-				newMetaInfo.getNamespaceSerializer());
+			newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+				restoredMetaInfoSnapshot,
+				namespaceSerializer,
+				stateDesc);
 
-			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-				restoredMetaInfo.getStateSerializer(),
-				UnloadableDummyTypeSerializer.class,
-				restoredMetaInfo.getStateSerializerConfigSnapshot(),
-				newMetaInfo.getStateSerializer());
+			stateInfo.f1 = newMetaInfo;
+		} else {
+			String stateName = stateDesc.getName();
 
-			if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
-				// TODO state migration currently isn't possible.
-				throw new StateMigrationException("State migration isn't supported, yet.");
-			} else {
-				stateInfo.f1 = newMetaInfo;
-				return stateInfo.f0;
-			}
+			newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+				stateDesc.getType(),
+				stateName,
+				namespaceSerializer,
+				stateDesc.getSerializer());
+
+			ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
+
+			stateInfo = Tuple2.of(columnFamily, newMetaInfo);
+			kvStateInformation.put(stateDesc.getName(), stateInfo);
 		}
 
-		byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+		return Tuple2.of(stateInfo.f0, newMetaInfo);
+	}
+
+	/**
+	 * Creates a column family handle for use with a k/v state.
+	 */
+	private ColumnFamilyHandle createColumnFamily(String stateName) throws IOException {
+		byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
 		Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
 			"The chosen state name 'default' collides with the name of the default column family!");
 
 		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions);
 
-		final ColumnFamilyHandle columnFamily;
-
 		try {
-			columnFamily = db.createColumnFamily(columnDescriptor);
+			return db.createColumnFamily(columnDescriptor);
 		} catch (RocksDBException e) {
 			throw new IOException("Error creating ColumnFamilyHandle.", e);
 		}
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple =
-			new Tuple2<>(columnFamily, newMetaInfo);
-		kvStateInformation.put(descriptor.getName(), tuple);
-		return columnFamily;
 	}
 
 	@Override
@@ -1206,9 +1187,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		TypeSerializer<N> namespaceSerializer,
 		ValueStateDescriptor<T> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
+				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
 
-		return new RocksDBValueState<>(columnFamily, namespaceSerializer,  stateDesc, this);
+		return new RocksDBValueState<>(
+				registerResult.f0,
+				registerResult.f1.getNamespaceSerializer(),
+				registerResult.f1.getStateSerializer(),
+				stateDesc.getDefaultValue(),
+				this);
 	}
 
 	@Override
@@ -1216,9 +1203,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		TypeSerializer<N> namespaceSerializer,
 		ListStateDescriptor<T> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
+				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
 
-		return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
+		return new RocksDBListState<>(
+				registerResult.f0,
+				registerResult.f1.getNamespaceSerializer(),
+				registerResult.f1.getStateSerializer(),
+				stateDesc.getDefaultValue(),
+				stateDesc.getElementSerializer(),
+				this);
 	}
 
 	@Override
@@ -1226,9 +1220,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		TypeSerializer<N> namespaceSerializer,
 		ReducingStateDescriptor<T> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
+				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
 
-		return new RocksDBReducingState<>(columnFamily, namespaceSerializer,  stateDesc, this);
+		return new RocksDBReducingState<>(
+				registerResult.f0,
+				registerResult.f1.getNamespaceSerializer(),
+				registerResult.f1.getStateSerializer(),
+				stateDesc.getDefaultValue(),
+				stateDesc.getReduceFunction(),
+				this);
 	}
 
 	@Override
@@ -1236,8 +1237,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		TypeSerializer<N> namespaceSerializer,
 		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
-		return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
+				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
+
+		return new RocksDBAggregatingState<>(
+				registerResult.f0,
+				registerResult.f1.getNamespaceSerializer(),
+				registerResult.f1.getStateSerializer(),
+				stateDesc.getDefaultValue(),
+				stateDesc.getAggregateFunction(),
+				this);
 	}
 
 	@Override
@@ -1245,9 +1254,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		TypeSerializer<N> namespaceSerializer,
 		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
+				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
 
-		return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+		return new RocksDBFoldingState<>(
+				registerResult.f0,
+				registerResult.f1.getNamespaceSerializer(),
+				registerResult.f1.getStateSerializer(),
+				stateDesc.getDefaultValue(),
+				stateDesc.getFoldFunction(),
+				this);
 	}
 
 	@Override
@@ -1255,9 +1271,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		TypeSerializer<N> namespaceSerializer,
 		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
+				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
 
-		return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);
+		return new RocksDBMapState<>(
+				registerResult.f0,
+				registerResult.f1.getNamespaceSerializer(),
+				registerResult.f1.getStateSerializer(),
+				stateDesc.getDefaultValue(),
+				this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index d9bcb6a..e0e5d52 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -47,11 +46,11 @@ import java.util.List;
  * @param <V> The type of the values in the list state.
  */
 public class RocksDBListState<K, N, V>
-		extends AbstractRocksDBState<K, N, List<V>, ListState<V>, ListStateDescriptor<V>>
+		extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
 		implements InternalListState<K, N, V> {
 
 	/** Serializer for the values. */
-	private final TypeSerializer<V> valueSerializer;
+	private final TypeSerializer<V> elementSerializer;
 
 	/**
 	 * Separator of StringAppendTestOperator in RocksDB.
@@ -61,17 +60,23 @@ public class RocksDBListState<K, N, V>
 	/**
 	 * Creates a new {@code RocksDBListState}.
 	 *
+	 * @param columnFamily The RocksDB column family that this state is associated to.
 	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                     and can create a default state value.
+	 * @param valueSerializer The serializer for the state.
+	 * @param defaultValue The default value for the state.
+	 * @param elementSerializer The serializer for elements of the list state.
+	 * @param backend The backend for which this state is bind to.
 	 */
-	public RocksDBListState(ColumnFamilyHandle columnFamily,
+	public RocksDBListState(
+			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<V> stateDesc,
+			TypeSerializer<List<V>> valueSerializer,
+			List<V> defaultValue,
+			TypeSerializer<V> elementSerializer,
 			RocksDBKeyedStateBackend<K> backend) {
 
-		super(columnFamily, namespaceSerializer, stateDesc, backend);
-		this.valueSerializer = stateDesc.getElementSerializer();
+		super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
+		this.elementSerializer = elementSerializer;
 	}
 
 	@Override
@@ -86,7 +91,7 @@ public class RocksDBListState<K, N, V>
 
 	@Override
 	public TypeSerializer<List<V>> getValueSerializer() {
-		return stateDesc.getSerializer();
+		return valueSerializer;
 	}
 
 	@Override
@@ -105,7 +110,7 @@ public class RocksDBListState<K, N, V>
 
 			List<V> result = new ArrayList<>();
 			while (in.available() > 0) {
-				result.add(valueSerializer.deserialize(in));
+				result.add(elementSerializer.deserialize(in));
 				if (in.available() > 0) {
 					in.readByte();
 				}
@@ -125,7 +130,7 @@ public class RocksDBListState<K, N, V>
 			byte[] key = keySerializationStream.toByteArray();
 			keySerializationStream.reset();
 			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
-			valueSerializer.serialize(value, out);
+			elementSerializer.serialize(value, out);
 			backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);
@@ -227,7 +232,7 @@ public class RocksDBListState<K, N, V>
 			} else {
 				keySerializationStream.write(DELIMITER);
 			}
-			valueSerializer.serialize(value, out);
+			elementSerializer.serialize(value, out);
 		}
 
 		return keySerializationStream.toByteArray();