You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/07 13:33:08 UTC

[flink] 02/05: [FLINK-11073] [state backends] Respect serializer reconfiguration in state backends

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e03cea5431dc63f603469bdbe8c499f8819e3cbd
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Dec 13 19:22:46 2018 +0800

    [FLINK-11073] [state backends] Respect serializer reconfiguration in state backends
    
    This commit adapts the StateSerializerProvider to respect the
    COMPATIBLE_WITH_RECONFIGURED_SERIALIZER case, which effectively lets all
    state backends respect serializer reconfiguration because state
    serializers are always obtained via StateSerializerProviders.
    
    It also makes StateSerializerProvider work for both eagerly / lazily
    registered serializers.
    This is required so that the StateSerializerProvider can be used for the
    key serializer in AbstractKeyedStateBackend. For the key serializer,
    regardless of whether or not we're restoring old state, we always first
    get the new key serializer, and then maybe get the previous key
    serializer's snapshot in the restore phase.
    
    Therefore, after this commit, access the key serializer in
    AbstractKeyedStateBackend is also governed by a StateSerializerProvider.
    
    For a more fine-grained explaination of all the changes that build up to
    this, please refer to #7329.
    
    This closes #7329.
---
 .../runtime/state/AbstractKeyedStateBackend.java   |  18 +-
 .../RegisteredBroadcastStateBackendMetaInfo.java   |   8 +-
 .../RegisteredKeyValueStateBackendMetaInfo.java    |  12 +-
 .../RegisteredOperatorStateBackendMetaInfo.java    |   4 +-
 ...egisteredPriorityQueueStateBackendMetaInfo.java |   4 +-
 .../runtime/state/StateSerializerProvider.java     | 253 ++++---
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  11 +-
 .../state/StateBackendMigrationTestBase.java       | 753 ++++++++++++---------
 .../runtime/state/StateSerializerProviderTest.java | 136 +++-
 .../runtime/testutils/statemigration/TestType.java |  23 +
 .../V1TestTypeSerializerSnapshot.java              |   7 +-
 .../V2TestTypeSerializerSnapshot.java              |  11 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  43 +-
 13 files changed, 820 insertions(+), 463 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 19de210..7422e65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -52,8 +54,8 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	Closeable,
 	CheckpointListener {
 
-	/** {@link TypeSerializer} for our key. */
-	protected final TypeSerializer<K> keySerializer;
+	/** {@link StateSerializerProvider} for our key serializer. */
+	private final StateSerializerProvider<K> keySerializerProvider;
 
 	/** The currently active key. */
 	private K currentKey;
@@ -104,7 +106,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		Preconditions.checkArgument(numberOfKeyGroups >= keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be at least the number in the key group range assigned to this backend");
 
 		this.kvStateRegistry = kvStateRegistry;
-		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.keySerializerProvider = StateSerializerProvider.fromNewRegisteredSerializer(keySerializer);
 		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
@@ -156,7 +158,13 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 */
 	@Override
 	public TypeSerializer<K> getKeySerializer() {
-		return keySerializer;
+		return keySerializerProvider.currentSchemaSerializer();
+	}
+
+	public TypeSerializerSchemaCompatibility<K> checkKeySerializerSchemaCompatibility(
+			TypeSerializerSnapshot<K> previousKeySerializerSnapshot) {
+
+		return keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(previousKeySerializerSnapshot);
 	}
 
 	/**
@@ -230,7 +238,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			final TypeSerializer<N> namespaceSerializer,
 			StateDescriptor<S, V> stateDescriptor) throws Exception {
 		checkNotNull(namespaceSerializer, "Namespace serializer");
-		checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
+		checkNotNull(keySerializerProvider, "State key serializer has not been configured in the config. " +
 				"This operation cannot use partitioned state.");
 
 		InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
index ecc13fa..e44559a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -55,8 +55,8 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta
 		this(
 			name,
 			assignmentMode,
-			StateSerializerProvider.fromNewState(keySerializer),
-			StateSerializerProvider.fromNewState(valueSerializer));
+			StateSerializerProvider.fromNewRegisteredSerializer(keySerializer),
+			StateSerializerProvider.fromNewRegisteredSerializer(valueSerializer));
 	}
 
 	public RegisteredBroadcastStateBackendMetaInfo(@Nonnull RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
@@ -73,10 +73,10 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta
 			snapshot.getName(),
 			OperatorStateHandle.Mode.valueOf(
 				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-			StateSerializerProvider.fromRestoredState(
+			StateSerializerProvider.fromPreviousSerializerSnapshot(
 				(TypeSerializerSnapshot<K>) Preconditions.checkNotNull(
 					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))),
-			StateSerializerProvider.fromRestoredState(
+			StateSerializerProvider.fromPreviousSerializerSnapshot(
 				(TypeSerializerSnapshot<V>) Preconditions.checkNotNull(
 					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index b37c79d..b2d1cdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -60,8 +60,8 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat
 		this(
 			stateType,
 			name,
-			StateSerializerProvider.fromNewState(namespaceSerializer),
-			StateSerializerProvider.fromNewState(stateSerializer),
+			StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
+			StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
 			null);
 	}
 
@@ -75,8 +75,8 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat
 		this(
 			stateType,
 			name,
-			StateSerializerProvider.fromNewState(namespaceSerializer),
-			StateSerializerProvider.fromNewState(stateSerializer),
+			StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
+			StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
 			snapshotTransformer);
 	}
 
@@ -85,10 +85,10 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat
 		this(
 			StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
 			snapshot.getName(),
-			StateSerializerProvider.fromRestoredState(
+			StateSerializerProvider.fromPreviousSerializerSnapshot(
 				(TypeSerializerSnapshot<N>) Preconditions.checkNotNull(
 					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))),
-			StateSerializerProvider.fromRestoredState(
+			StateSerializerProvider.fromPreviousSerializerSnapshot(
 				(TypeSerializerSnapshot<S>) Preconditions.checkNotNull(
 					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
 			null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
index 921947a..6b83ca2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -56,7 +56,7 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe
 			@Nonnull OperatorStateHandle.Mode assignmentMode) {
 		this(
 			name,
-			StateSerializerProvider.fromNewState(partitionStateSerializer),
+			StateSerializerProvider.fromNewRegisteredSerializer(partitionStateSerializer),
 			assignmentMode);
 	}
 
@@ -71,7 +71,7 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe
 	public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
 		this(
 			snapshot.getName(),
-			StateSerializerProvider.fromRestoredState(
+			StateSerializerProvider.fromPreviousSerializerSnapshot(
 				(TypeSerializerSnapshot<S>) Preconditions.checkNotNull(
 					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
 			OperatorStateHandle.Mode.valueOf(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
index 961d96f..691e74c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -42,14 +42,14 @@ public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredSt
 		@Nonnull String name,
 		@Nonnull TypeSerializer<T> elementSerializer) {
 
-		this(name, StateSerializerProvider.fromNewState(elementSerializer));
+		this(name, StateSerializerProvider.fromNewRegisteredSerializer(elementSerializer));
 	}
 
 	@SuppressWarnings("unchecked")
 	public RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
 		this(
 			snapshot.getName(),
-			StateSerializerProvider.fromRestoredState(
+			StateSerializerProvider.fromPreviousSerializerSnapshot(
 				(TypeSerializerSnapshot<T>) Preconditions.checkNotNull(
 					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
index a24f12e..bad37ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
@@ -35,6 +35,21 @@ import static org.apache.flink.util.Preconditions.checkState;
  * A {@link StateSerializerProvider} wraps logic on how to obtain serializers for registered state,
  * either with the previous schema of state in checkpoints or the current schema of state.
  *
+ * <p>A provider can be created from either a registered state serializer, or the snapshot
+ * of the previous state serializer. For the former case, if the state was restored and a
+ * snapshot of the previous state serializer was retrieved later on, the snapshot can be set
+ * on the provider which also additionally checks the compatibility of the initially registered
+ * serializer. Similarly for the latter case, if a new state serializer is registered later on,
+ * it can be set on the provider, which then also checks the compatibility of the new registered
+ * serializer.
+ *
+ * <p>Simply put, the provider works both directions - either creating it first with a registered
+ * serializer or the previous serializer's snapshot, and then setting the previous serializer's
+ * snapshot (if the provider was created with a registered serializer) or a new registered state
+ * serializer (if the provider was created with a serializer snapshot). Either way,
+ * the new registered serializer is checked for schema compatibility once both the new serializer
+ * and the previous serializer snapshot is present.
+ *
  * @param <T> the type of the state.
  */
 @Internal
@@ -44,13 +59,37 @@ public abstract class StateSerializerProvider<T> {
 	 * The registered serializer for the state.
 	 *
 	 * <p>In the case that this provider was created from a restored serializer snapshot via
-	 * {@link #fromRestoredState(TypeSerializerSnapshot)}, but a new serializer was never registered
+	 * {@link #fromPreviousSerializerSnapshot(TypeSerializerSnapshot)}, but a new serializer was never registered
 	 * for the state (i.e., this is the case if a restored state was never accessed), this would be {@code null}.
 	 */
 	@Nullable
 	TypeSerializer<T> registeredSerializer;
 
 	/**
+	 * The state's previous serializer's snapshot.
+	 *
+	 * <p>In the case that this provider was created from a registered state serializer instance via
+	 * {@link #fromNewRegisteredSerializer(TypeSerializer)}, but a serializer snapshot was never supplied to this
+	 * provider (i.e. because the registered serializer was for a new state, not a restored one), this
+	 * would be {@code null}.
+	 */
+	@Nullable
+	TypeSerializerSnapshot<T> previousSerializerSnapshot;
+
+	/**
+	 * The restore serializer, lazily created only when the restore serializer is accessed.
+	 *
+	 * <p>NOTE: It is important to only create this lazily, so that off-heap
+	 * state do not fail eagerly when restoring state that has a
+	 * {@link UnloadableDummyTypeSerializer} as the previous serializer. This should
+	 * be relevant only for restores from Flink versions prior to 1.7.x.
+	 */
+	@Nullable
+	private TypeSerializer<T> cachedRestoredSerializer;
+
+	private boolean isRegisteredWithIncompatibleSerializer = false;
+
+	/**
 	 * Creates a {@link StateSerializerProvider} for restored state from the previous serializer's snapshot.
 	 *
 	 * <p>Once a new serializer is registered for the state, it should be provided via
@@ -59,26 +98,36 @@ public abstract class StateSerializerProvider<T> {
 	 * @param stateSerializerSnapshot the previous serializer's snapshot.
 	 * @param <T> the type of the state.
 	 *
-	 * @return a new {@link StateSerializerProvider} for restored state.
+	 * @return a new {@link StateSerializerProvider}.
 	 */
-	public static <T> StateSerializerProvider<T> fromRestoredState(TypeSerializerSnapshot<T> stateSerializerSnapshot) {
-		return new RestoredStateSerializerProvider<>(stateSerializerSnapshot);
+	public static <T> StateSerializerProvider<T> fromPreviousSerializerSnapshot(TypeSerializerSnapshot<T> stateSerializerSnapshot) {
+		return new LazilyRegisteredStateSerializerProvider<>(stateSerializerSnapshot);
 	}
 
 	/**
-	 * Creates a {@link StateSerializerProvider} for new state from the registered state serializer.
+	 * Creates a {@link StateSerializerProvider} from the registered state serializer.
+	 *
+	 * <p>If the state is a restored one, and the previous serializer's snapshot is
+	 * obtained later on, is should be supplied via the
+	 * {@link #setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)} method.
 	 *
 	 * @param registeredStateSerializer the new state's registered serializer.
 	 * @param <T> the type of the state.
 	 *
-	 * @return a new {@link StateSerializerProvider} for new state.
+	 * @return a new {@link StateSerializerProvider}.
 	 */
-	public static <T> StateSerializerProvider<T> fromNewState(TypeSerializer<T> registeredStateSerializer) {
-		return new NewStateSerializerProvider<>(registeredStateSerializer);
+	public static <T> StateSerializerProvider<T> fromNewRegisteredSerializer(TypeSerializer<T> registeredStateSerializer) {
+		return new EagerlyRegisteredStateSerializerProvider<>(registeredStateSerializer);
 	}
 
-	private StateSerializerProvider(@Nullable TypeSerializer<T> stateSerializer) {
+	private StateSerializerProvider(@Nonnull TypeSerializer<T> stateSerializer) {
 		this.registeredSerializer = stateSerializer;
+		this.previousSerializerSnapshot = null;
+	}
+
+	private StateSerializerProvider(@Nonnull TypeSerializerSnapshot<T> previousSerializerSnapshot) {
+		this.previousSerializerSnapshot = previousSerializerSnapshot;
+		this.registeredSerializer = null;
 	}
 
 	/**
@@ -92,36 +141,63 @@ public abstract class StateSerializerProvider<T> {
 	 * identical. Therefore, in this case, it is guaranteed that the serializer returned by
 	 * this method is the same as the one returned by {@link #previousSchemaSerializer()}.
 	 *
-	 * <p>If this provider was created from new state, then this always returns the
-	 * serializer that the new state was registered with.
+	 * <p>If this provider was created from a serializer instance, then this always returns the
+	 * that same serializer instance. If later on a snapshot of the previous serializer is supplied
+	 * via {@link #setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)}, then
+	 * the initially supplied serializer instance will be checked for compatibility.
 	 *
 	 * @return a serializer that reads and writes in the current schema of the state.
 	 */
 	@Nonnull
-	public abstract TypeSerializer<T> currentSchemaSerializer();
+	public final TypeSerializer<T> currentSchemaSerializer() {
+		if (registeredSerializer != null) {
+			checkState(
+				!isRegisteredWithIncompatibleSerializer,
+				"Unable to provide a serializer with the current schema, because the restored state was " +
+					"registered with a new serializer that has incompatible schema.");
+
+			return registeredSerializer;
+		}
+
+		// if we are not yet registered with a new serializer,
+		// we can just use the restore serializer to read / write the state.
+		return previousSchemaSerializer();
+	};
 
 	/**
 	 * Gets the serializer that recognizes the previous serialization schema of the state.
 	 * This is the serializer that should be used for restoring the state, i.e. when the state
 	 * is still in the previous serialization schema.
 	 *
-	 * <p>This method can only be used if this provider was created from a restored state's serializer
-	 * snapshot. If this provider was created from new state, then this method is
-	 * irrelevant, since there doesn't exist any previous version of the state schema.
+	 * <p>This method only returns a serializer if this provider has the previous serializer's
+	 * snapshot. Otherwise, trying to access the previous schema serializer will fail
+	 * with an exception.
 	 *
 	 * @return a serializer that reads and writes in the previous schema of the state.
 	 */
 	@Nonnull
-	public abstract TypeSerializer<T> previousSchemaSerializer();
+	public final TypeSerializer<T> previousSchemaSerializer() {
+		if (cachedRestoredSerializer != null) {
+			return cachedRestoredSerializer;
+		}
+
+		if (previousSerializerSnapshot == null) {
+			throw new UnsupportedOperationException(
+				"This provider does not contain the state's previous serializer's snapshot. Cannot provider a serializer for previous schema.");
+		}
+
+		this.cachedRestoredSerializer = previousSerializerSnapshot.restoreSerializer();
+		return cachedRestoredSerializer;
+	};
 
 	/**
 	 * For restored state, register a new serializer that potentially has a new serialization schema.
 	 *
 	 * <p>Users are allowed to register serializers for state only once. Therefore, this method
-	 * is irrelevant if this provider was created from new state, since a state serializer had
+	 * is irrelevant if this provider was created with a serializer instance, since a state serializer had
 	 * been registered already.
 	 *
-	 * <p>For the case where this provider was created from restored state, then this method should
+	 * <p>For the case where this provider was created from a serializer snapshot, then this method should
 	 * be called at most once. The new serializer will be checked for its schema compatibility with the
 	 * previous serializer's schema, and returned to the caller. The caller is responsible for
 	 * checking the result and react appropriately to it, as follows:
@@ -143,52 +219,61 @@ public abstract class StateSerializerProvider<T> {
 	public abstract TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer);
 
 	/**
-	 * Implementation of the {@link StateSerializerProvider} for the restored state case.
+	 * For restored state, set the state's previous serializer's snapshot.
+	 *
+	 * <p>Users are allowed to set the previous serializer's snapshot once. Therefore, this method
+	 * is irrelevant if this provider was created with a serializer snapshot, since the serializer
+	 * snapshot had been set already.
+	 *
+	 * <p>For the case where this provider was created from a serializer instance, then this method should
+	 * be called at most once. The initially registered state serializer will be checked for its
+	 * schema compatibility with the previous serializer's schema, and returned to the caller.
+	 * The caller is responsible for checking the result and react appropriately to it, as follows:
+	 * <ul>
+	 *     <li>{@link TypeSerializerSchemaCompatibility#isCompatibleAsIs()}: nothing needs to be done.
+	 *     {@link #currentSchemaSerializer()} remains to return the initially registered serializer.</li>
+	 *     <li>{@link TypeSerializerSchemaCompatibility#isCompatibleAfterMigration()} ()}: state needs to be
+	 *     migrated before the serializer returned by {@link #currentSchemaSerializer()} can be used.
+	 *     The migration should be performed by reading the state with {@link #previousSchemaSerializer()},
+	 *     and then writing it again with {@link #currentSchemaSerializer()}.</li>
+	 *     <li>{@link TypeSerializerSchemaCompatibility#isIncompatible()}: the registered serializer is
+	 *     incompatible. {@link #currentSchemaSerializer()} can no longer return a serializer for
+	 *     the state, and therefore this provider shouldn't be used anymore.</li>
+	 * </ul>
+	 *
+	 * @param previousSerializerSnapshot the state's previous serializer's snapshot
+	 *
+	 * @return the schema compatibility of the initially registered serializer, with respect to the previous serializer.
 	 */
-	private static class RestoredStateSerializerProvider<T> extends StateSerializerProvider<T> {
-
-		/**
-		 * The snapshot of the previous serializer of the state.
-		 */
-		@Nonnull
-		private final TypeSerializerSnapshot<T> previousSerializerSnapshot;
-
-		private boolean isRegisteredWithIncompatibleSerializer = false;
-
-		RestoredStateSerializerProvider(TypeSerializerSnapshot<T> previousSerializerSnapshot) {
-			super(null);
-			this.previousSerializerSnapshot = Preconditions.checkNotNull(previousSerializerSnapshot);
-		}
-
-		/**
-		 * The restore serializer, lazily created only when the restore serializer is accessed.
-		 *
-		 * <p>NOTE: It is important to only create this lazily, so that off-heap
-		 * state do not fail eagerly when restoring state that has a
-		 * {@link UnloadableDummyTypeSerializer} as the previous serializer. This should
-		 * be relevant only for restores from Flink versions prior to 1.7.x.
-		 */
-		@Nullable
-		private TypeSerializer<T> cachedRestoredSerializer;
+	@Nonnull
+	public abstract TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> previousSerializerSnapshot);
 
-		@Override
-		@Nonnull
-		public TypeSerializer<T> currentSchemaSerializer() {
-			if (registeredSerializer != null) {
-				checkState(
-					!isRegisteredWithIncompatibleSerializer,
-					"Unable to provide a serializer with the current schema, because the restored state was " +
-						"registered with a new serializer that has incompatible schema.");
+	/**
+	 * Invalidates access to the current schema serializer. This lets {@link #currentSchemaSerializer()}
+	 * fail when invoked.
+	 *
+	 * <p>Access to the current schema serializer should be invalidated by the methods
+	 * {@link #registerNewSerializerForRestoredState(TypeSerializer)} or
+	 * {@link #setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)}
+	 * once the registered serializer is determined to be incompatible.
+	 */
+	protected final void invalidateCurrentSchemaSerializerAccess() {
+		this.isRegisteredWithIncompatibleSerializer = true;
+	}
 
-					return registeredSerializer;
-			}
+	/**
+	 * Implementation of the {@link StateSerializerProvider} for the case where a snapshot of the
+	 * previous serializer is obtained before a new state serializer is registered (hence, the naming "lazily" registered).
+	 */
+	private static class LazilyRegisteredStateSerializerProvider<T> extends StateSerializerProvider<T> {
 
-			// if we are not yet registered with a new serializer,
-			// we can just use the restore serializer to read / write the state.
-			return previousSchemaSerializer();
+		LazilyRegisteredStateSerializerProvider(TypeSerializerSnapshot<T> previousSerializerSnapshot) {
+			super(Preconditions.checkNotNull(previousSerializerSnapshot));
 		}
 
 		@Nonnull
+		@Override
+		@SuppressWarnings("ConstantConditions")
 		public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
 			checkNotNull(newSerializer);
 			if (registeredSerializer != null) {
@@ -197,49 +282,59 @@ public abstract class StateSerializerProvider<T> {
 
 			TypeSerializerSchemaCompatibility<T> result = previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
 			if (result.isIncompatible()) {
-				this.isRegisteredWithIncompatibleSerializer = true;
+				invalidateCurrentSchemaSerializerAccess();
+			}
+			if (result.isCompatibleWithReconfiguredSerializer()) {
+				this.registeredSerializer = result.getReconfiguredSerializer();
+			} else {
+				this.registeredSerializer = newSerializer;
 			}
-			this.registeredSerializer = newSerializer;
 			return result;
 		}
 
 		@Nonnull
-		public final TypeSerializer<T> previousSchemaSerializer() {
-			if (cachedRestoredSerializer != null) {
-				return cachedRestoredSerializer;
-			}
-
-			this.cachedRestoredSerializer = previousSerializerSnapshot.restoreSerializer();
-			return cachedRestoredSerializer;
+		@Override
+		public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRestoredState(
+				TypeSerializerSnapshot<T> previousSerializerSnapshot) {
+			throw new UnsupportedOperationException("The snapshot of the state's previous serializer has already been set; cannot reset.");
 		}
 	}
 
 	/**
-	 * Implementation of the {@link StateSerializerProvider} for the new state case.
+	 * Implementation of the {@link StateSerializerProvider} for the case where a new state
+	 * serializer instance is registered first, before any snapshots of the previous state serializer
+	 * is obtained (hence, the naming "eagerly" registered).
 	 */
-	private static class NewStateSerializerProvider<T> extends StateSerializerProvider<T> {
+	private static class EagerlyRegisteredStateSerializerProvider<T> extends StateSerializerProvider<T> {
 
-		NewStateSerializerProvider(TypeSerializer<T> registeredStateSerializer) {
+		EagerlyRegisteredStateSerializerProvider(TypeSerializer<T> registeredStateSerializer) {
 			super(Preconditions.checkNotNull(registeredStateSerializer));
 		}
 
-		@Override
 		@Nonnull
-		@SuppressWarnings("ConstantConditions")
-		public TypeSerializer<T> currentSchemaSerializer() {
-			return registeredSerializer;
-		}
-
 		@Override
-		@Nonnull
 		public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
 			throw new UnsupportedOperationException("A serializer has already been registered for the state; re-registration is not allowed.");
 		}
 
-		@Override
 		@Nonnull
-		public TypeSerializer<T> previousSchemaSerializer() {
-			throw new UnsupportedOperationException("This is a NewStateSerializerProvider; you cannot get a restore serializer because there was no restored state.");
+		@Override
+		public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> previousSerializerSnapshot) {
+			checkNotNull(previousSerializerSnapshot);
+			if (this.previousSerializerSnapshot != null) {
+				throw new UnsupportedOperationException("The snapshot of the state's previous serializer has already been set; cannot reset.");
+			}
+
+			this.previousSerializerSnapshot = previousSerializerSnapshot;
+
+			TypeSerializerSchemaCompatibility<T> result = previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer);
+			if (result.isIncompatible()) {
+				invalidateCurrentSchemaSerializerAccess();
+			}
+			if (result.isCompatibleWithReconfiguredSerializer()) {
+				this.registeredSerializer = result.getReconfiguredSerializer();
+			}
+			return result;
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 3f8761b..c957414 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -243,7 +243,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
 				restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
-			if (!namespaceCompatibility.isCompatibleAsIs()) {
+			if (namespaceCompatibility.isCompatibleAfterMigration() || namespaceCompatibility.isIncompatible()) {
 				throw new StateMigrationException("For heap backends, the new namespace serializer must be compatible.");
 			}
 
@@ -302,7 +302,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 		StateTable<K, N, SV> stateTable = tryRegisterStateTable(
 			namespaceSerializer, stateDesc, getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
-		return stateFactory.createState(stateDesc, stateTable, keySerializer);
+		return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
 	}
 
 	@SuppressWarnings("unchecked")
@@ -394,8 +394,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				if (!keySerializerRestored) {
 					// check for key serializer compatibility; this also reconfigures the
 					// key serializer to be compatible, if it is required and is possible
-					if (!serializationProxy.getKeySerializerConfigSnapshot()
-							.resolveSchemaCompatibility(keySerializer).isCompatibleAsIs()) {
+					TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
+						checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerConfigSnapshot());
+					if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
 						throw new StateMigrationException("The new key serializer must be compatible.");
 					}
 
@@ -700,7 +701,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				new KeyedBackendSerializationProxy<>(
 					// TODO: this code assumes that writing a serializer is threadsafe, we should support to
 					// get a serialized form already at state registration time in the future
-					keySerializer,
+					getKeySerializer(),
 					metaInfoSnapshots,
 					!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 5511792..8d85e74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -40,6 +40,7 @@ import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -69,23 +70,70 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 	private CheckpointStorageLocation checkpointStorageLocation;
 
 	// -------------------------------------------------------------------------------
-	//  Keyed state backend migration tests
+	//  Tests for keyed ValueState
 	// -------------------------------------------------------------------------------
 
 	@Test
 	public void testKeyedValueStateMigration() throws Exception {
+		final String stateName = "test-name";
+
+		testKeyedValueStateUpgrade(
+			new ValueStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ValueStateDescriptor<>(
+				stateName,
+				// restore with a V2 serializer that has a different schema
+				new TestType.V2TestTypeSerializer()));
+	}
+
+	@Test
+	public void testKeyedValueStateSerializerReconfiguration() throws Exception {
+		final String stateName = "test-name";
+
+		testKeyedValueStateUpgrade(
+			new ValueStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ValueStateDescriptor<>(
+				stateName,
+				// the test fails if this serializer is used instead of a reconfigured new serializer
+				new TestType.ReconfigurationRequiringTestTypeSerializer()));
+	}
+
+	@Test
+	public void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws Exception {
+		final String stateName = "test-name";
+
+		try {
+			testKeyedValueStateUpgrade(
+				new ValueStateDescriptor<>(
+					stateName,
+					new TestType.V1TestTypeSerializer()),
+				new ValueStateDescriptor<>(
+					stateName,
+					new TestType.IncompatibleTestTypeSerializer()));
+
+			Assert.fail("should have failed");
+		} catch (Exception expected) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, StateMigrationException.class).isPresent());
+		}
+	}
+
+	private void testKeyedValueStateUpgrade(
+			ValueStateDescriptor<TestType> initialAccessDescriptor,
+			ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 
-		final String stateName = "test-name";
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		try {
-			ValueStateDescriptor<TestType> kvId = new ValueStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer());
-			ValueState<TestType> valueState = backend
-				.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+			ValueState<TestType> valueState = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				CustomVoidNamespaceSerializer.INSTANCE,
+				initialAccessDescriptor);
 
 			backend.setCurrentKey(1);
 			valueState.update(new TestType("foo", 1456));
@@ -101,16 +149,13 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
 
-			// the new serializer is V2, and has a completely new serialization schema.
-			kvId = new ValueStateDescriptor<>(
-				stateName,
-				new TestType.V2TestTypeSerializer());
-			valueState = backend
-				.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+			valueState = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				CustomVoidNamespaceSerializer.INSTANCE,
+				newAccessDescriptorAfterRestore);
 
 			snapshot.discardState();
 
-			// the state backend should have decided whether or not it needs to perform state migration;
 			// make sure that reading and writing each key state works with the new serializer
 			backend.setCurrentKey(1);
 			Assert.assertEquals(new TestType("foo", 1456), valueState.value());
@@ -128,20 +173,72 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 		}
 	}
 
+	// -------------------------------------------------------------------------------
+	//  Tests for keyed ListState
+	// -------------------------------------------------------------------------------
+
 	@Test
 	public void testKeyedListStateMigration() throws Exception {
+		final String stateName = "test-name";
+
+		testKeyedListStateUpgrade(
+			new ListStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ListStateDescriptor<>(
+				stateName,
+				// restore with a V2 serializer that has a different schema
+				new TestType.V2TestTypeSerializer()));
+	}
+
+	@Test
+	@Ignore("This currently doesn't pass because the ListSerializer doesn't respect the reconfigured case, yet.")
+	public void testKeyedListStateSerializerReconfiguration() throws Exception {
+		final String stateName = "test-name";
+
+		testKeyedListStateUpgrade(
+			new ListStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ListStateDescriptor<>(
+				stateName,
+				// the test fails if this serializer is used instead of a reconfigured new serializer
+				new TestType.ReconfigurationRequiringTestTypeSerializer()));
+	}
+
+	@Test
+	public void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws Exception {
+		final String stateName = "test-name";
+
+		try {
+			testKeyedListStateUpgrade(
+				new ListStateDescriptor<>(
+					stateName,
+					new TestType.V1TestTypeSerializer()),
+				new ListStateDescriptor<>(
+					stateName,
+					new TestType.IncompatibleTestTypeSerializer()));
+
+			Assert.fail("should have failed");
+		} catch (Exception expected) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, StateMigrationException.class).isPresent());
+		}
+	}
+
+	private void testKeyedListStateUpgrade(
+			ListStateDescriptor<TestType> initialAccessDescriptor,
+			ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 
-		final String stateName = "test-name";
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		try {
-			ListStateDescriptor<TestType> kvId = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer());
-			ListState<TestType> listState = backend
-				.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+			ListState<TestType> listState = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				CustomVoidNamespaceSerializer.INSTANCE,
+				initialAccessDescriptor);
 
 			backend.setCurrentKey(1);
 			listState.add(new TestType("key-1", 1));
@@ -162,16 +259,13 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
 
-			// the new serializer is V2, and has a completely new serialization schema.
-			kvId = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V2TestTypeSerializer());
-			listState = backend
-				.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+			listState = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				CustomVoidNamespaceSerializer.INSTANCE,
+				newAccessDescriptorAfterRestore);
 
 			snapshot.discardState();
 
-			// the state backend should have decided whether or not it needs to perform state migration;
 			// make sure that reading and writing each key state works with the new serializer
 			backend.setCurrentKey(1);
 			Iterator<TestType> iterable1 = listState.get().iterator();
@@ -198,27 +292,24 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 		}
 	}
 
+	// -------------------------------------------------------------------------------
+	//  Tests for keyed priority queue state
+	// -------------------------------------------------------------------------------
+
 	@Test
-	public void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws Exception {
+	public void testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 
-		final String stateName = "test-name";
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		try {
-			ValueStateDescriptor<TestType> kvId = new ValueStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer());
-			ValueState<TestType> valueState = backend
-				.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+			InternalPriorityQueue<TestType> internalPriorityQueue = backend.create(
+				"testPriorityQueue", new TestType.V1TestTypeSerializer());
 
-			backend.setCurrentKey(1);
-			valueState.update(new TestType("foo", 1456));
-			backend.setCurrentKey(2);
-			valueState.update(new TestType("bar", 478));
-			backend.setCurrentKey(3);
-			valueState.update(new TestType("hello", 189));
+			internalPriorityQueue.add(new TestType("key-1", 123));
+			internalPriorityQueue.add(new TestType("key-2", 346));
+			internalPriorityQueue.add(new TestType("key-1", 777));
 
 			KeyedStateHandle snapshot = runSnapshot(
 				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
@@ -226,110 +317,64 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 			backend.dispose();
 
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
-
-			kvId = new ValueStateDescriptor<>(
-				stateName,
-				new TestType.IncompatibleTestTypeSerializer());
-
-			// the new serializer is INCOMPATIBLE, so registering the state should fail
-			backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+			backend.create(
+				"testPriorityQueue", new TestType.IncompatibleTestTypeSerializer());
 
 			Assert.fail("should have failed");
 		} catch (Exception e) {
 			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-		}finally {
+		} finally {
 			backend.dispose();
 		}
 	}
 
-	@Test
-	public void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-
-		final String stateName = "test-name";
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	// -------------------------------------------------------------------------------
+	//  Tests for key serializer in keyed state backends
+	// -------------------------------------------------------------------------------
 
+	@Test
+	public void testStateBackendRestoreFailsIfNewKeySerializerRequiresMigration() throws Exception {
 		try {
-			ListStateDescriptor<TestType> kvId = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer());
-			ListState<TestType> listState = backend
-				.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
-
-			backend.setCurrentKey(1);
-			listState.add(new TestType("key-1", 1));
-			listState.add(new TestType("key-1", 2));
-			listState.add(new TestType("key-1", 3));
-
-			backend.setCurrentKey(2);
-			listState.add(new TestType("key-2", 1));
-
-			backend.setCurrentKey(3);
-			listState.add(new TestType("key-3", 1));
-			listState.add(new TestType("key-3", 2));
-
-			KeyedStateHandle snapshot = runSnapshot(
-				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-			backend.dispose();
-
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
-
-			kvId = new ListStateDescriptor<>(
-				stateName,
-				new TestType.IncompatibleTestTypeSerializer());
-
-			// the new serializer is INCOMPATIBLE, so registering the state should fail
-			backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+			testKeySerializerUpgrade(
+				new TestType.V1TestTypeSerializer(),
+				new TestType.V2TestTypeSerializer());
 
 			Assert.fail("should have failed");
-		} catch (Exception e) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-		} finally {
-			backend.dispose();
+		} catch (Exception expected) {
+			// the new key serializer requires migration; this should fail the restore
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, StateMigrationException.class).isPresent());
 		}
 	}
 
 	@Test
-	public void testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration() throws Exception {
+		testKeySerializerUpgrade(
+			new TestType.V1TestTypeSerializer(),
+			new TestType.ReconfigurationRequiringTestTypeSerializer());
+	}
 
+	@Test
+	public void testStateBackendRestoreFailsIfNewKeySerializerIsIncompatible() throws Exception {
 		try {
-			InternalPriorityQueue<TestType> internalPriorityQueue = backend.create(
-				"testPriorityQueue", new TestType.V1TestTypeSerializer());
-
-			internalPriorityQueue.add(new TestType("key-1", 123));
-			internalPriorityQueue.add(new TestType("key-2", 346));
-			internalPriorityQueue.add(new TestType("key-1", 777));
-
-			KeyedStateHandle snapshot = runSnapshot(
-				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-			backend.dispose();
-
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
-			backend.create(
-				"testPriorityQueue", new TestType.IncompatibleTestTypeSerializer());
+			testKeySerializerUpgrade(
+				new TestType.V1TestTypeSerializer(),
+				new TestType.IncompatibleTestTypeSerializer());
 
 			Assert.fail("should have failed");
-		} catch (Exception e) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-		} finally {
-			backend.dispose();
+		} catch (Exception expected) {
+			// the new key serializer is incompatible; this should fail the restore
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, StateMigrationException.class).isPresent());
 		}
 	}
 
-	@Test
-	public void testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() throws Exception {
+	private void testKeySerializerUpgrade(
+			TypeSerializer<TestType> initialKeySerializer,
+			TypeSerializer<TestType> newKeySerializer) throws Exception {
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 
-		AbstractKeyedStateBackend<TestType> backend = createKeyedBackend(
-			new TestType.V1TestTypeSerializer());
+		AbstractKeyedStateBackend<TestType> backend = createKeyedBackend(initialKeySerializer);
 
 		final String stateName = "test-name";
 		try {
@@ -347,30 +392,66 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 				sharedStateRegistry);
 			backend.dispose();
 
-			try {
-				// the new key serializer is incompatible; this should fail the restore
-				restoreKeyedBackend(new TestType.IncompatibleTestTypeSerializer(), snapshot);
+			backend = restoreKeyedBackend(newKeySerializer, snapshot);
 
-				Assert.fail("should have failed");
-			} catch (Exception e) {
-				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-			}
+			valueState = backend
+				.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
-			try {
-				// the new key serializer requires migration; this should fail the restore
-				restoreKeyedBackend(new TestType.V2TestTypeSerializer(), snapshot);
+			// access and check previous state
+			backend.setCurrentKey(new TestType("foo", 123));
+			Assert.assertEquals(1, valueState.value().intValue());
+			backend.setCurrentKey(new TestType("bar", 456));
+			Assert.assertEquals(5, valueState.value().intValue());
 
-				Assert.fail("should have failed");
-			} catch (Exception e) {
-				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-			}
+			snapshot.discardState();
 		} finally {
 			backend.dispose();
 		}
 	}
 
+	// -------------------------------------------------------------------------------
+	//  Tests for namespace serializer in keyed state backends
+	// -------------------------------------------------------------------------------
+
 	@Test
-	public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatible() throws Exception {
+	public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerRequiresMigration() throws Exception {
+		try {
+			testNamespaceSerializerUpgrade(
+				new TestType.V1TestTypeSerializer(),
+				new TestType.V2TestTypeSerializer());
+
+			Assert.fail("should have failed");
+		} catch (Exception expected) {
+			// the new namespace serializer requires migration; this should fail the restore
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, StateMigrationException.class).isPresent());
+		}
+	}
+
+	@Test
+	public void testKeyedStateRegistrationSucceedsIfNewNamespaceSerializerRequiresReconfiguration() throws Exception {
+		testNamespaceSerializerUpgrade(
+			new TestType.V1TestTypeSerializer(),
+			new TestType.ReconfigurationRequiringTestTypeSerializer());
+	}
+
+	@Test
+	public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsIncompatible() throws Exception {
+		try {
+			testNamespaceSerializerUpgrade(
+				new TestType.V1TestTypeSerializer(),
+				new TestType.IncompatibleTestTypeSerializer());
+
+			Assert.fail("should have failed");
+		} catch (Exception expected) {
+			// the new namespace serializer is incompatible; this should fail the restore
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, StateMigrationException.class).isPresent());
+		}
+	}
+
+	private void testNamespaceSerializerUpgrade(
+			TypeSerializer<TestType> initialNamespaceSerializer,
+			TypeSerializer<TestType> newNamespaceSerializerAfterRestore) throws Exception {
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 
@@ -382,7 +463,7 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 			ValueState<Integer> valueState = backend
 				.getPartitionedState(
 					new TestType("namespace", 123),
-					new TestType.V1TestTypeSerializer(),
+					initialNamespaceSerializer,
 					kvId);
 
 			backend.setCurrentKey(1);
@@ -397,53 +478,87 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 			// test incompatible namespace serializer; start with a freshly restored backend
 			backend.dispose();
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
-			try {
-				// the new namespace serializer is incompatible; this should fail the restore
-				backend.getPartitionedState(
-					new TestType("namespace", 123),
-					new TestType.IncompatibleTestTypeSerializer(),
-					kvId);
 
-				Assert.fail("should have failed");
-			} catch (Exception e) {
-				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-			}
+			valueState = backend.getPartitionedState(
+				new TestType("namespace", 123),
+				newNamespaceSerializerAfterRestore,
+				kvId);
 
-			// test namespace serializer that requires migration; start with a freshly restored backend
-			backend.dispose();
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
-			try {
-				// the new namespace serializer requires migration; this should fail the restore
-				backend.getPartitionedState(
-					new TestType("namespace", 123),
-					new TestType.V2TestTypeSerializer(),
-					kvId);
+			// access and check previous state
+			backend.setCurrentKey(1);
+			Assert.assertEquals(10, valueState.value().intValue());
+			valueState.update(10);
+			backend.setCurrentKey(5);
+			Assert.assertEquals(50, valueState.value().intValue());
 
-				Assert.fail("should have failed");
-			} catch (Exception e) {
-				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-			}
+			snapshot.discardState();
 		} finally {
 			backend.dispose();
 		}
 	}
 
 	// -------------------------------------------------------------------------------
-	//  Operator state backend migration tests
+	//  Operator state backend partitionable list state tests
 	// -------------------------------------------------------------------------------
 
 	@Test
 	public void testOperatorParitionableListStateMigration() throws Exception {
+		final String stateName = "partitionable-list-state";
+
+		testOperatorPartitionableListStateUpgrade(
+			new ListStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ListStateDescriptor<>(
+				stateName,
+				// restore with a V2 serializer that has a different schema
+				new TestType.V2TestTypeSerializer()));
+	}
+
+	@Test
+	public void testOperatorParitionableListStateSerializerReconfiguration() throws Exception {
+		final String stateName = "partitionable-list-state";
+
+		testOperatorPartitionableListStateUpgrade(
+			new ListStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ListStateDescriptor<>(
+				stateName,
+				// restore with a new serializer that requires reconfiguration
+				new TestType.ReconfigurationRequiringTestTypeSerializer()));
+	}
+
+	@Test
+	public void testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
+		final String stateName = "partitionable-list-state";
+
+		try {
+			testOperatorPartitionableListStateUpgrade(
+				new ListStateDescriptor<>(
+					stateName,
+					new TestType.V1TestTypeSerializer()),
+				new ListStateDescriptor<>(
+					stateName,
+					// restore with a new incompatible serializer
+					new TestType.IncompatibleTestTypeSerializer()));
+
+			Assert.fail("should have failed.");
+		} catch (Exception e) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
+		}
+	}
+
+	private void testOperatorPartitionableListStateUpgrade(
+			ListStateDescriptor<TestType> initialAccessDescriptor,
+			ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 
 		OperatorStateBackend backend = createOperatorStateBackend();
 
-		final String stateName = "partitionable-list-state";
 		try {
-			ListStateDescriptor<TestType> descriptor = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer());
-			ListState<TestType> state = backend.getListState(descriptor);
+			ListState<TestType> state = backend.getListState(initialAccessDescriptor);
 
 			state.add(new TestType("foo", 13));
 			state.add(new TestType("bar", 278));
@@ -454,12 +569,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			backend = restoreOperatorStateBackend(snapshot);
 
-			descriptor = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V2TestTypeSerializer());
-			state = backend.getListState(descriptor);
+			state = backend.getListState(newAccessDescriptorAfterRestore);
 
-			// the state backend should have decided whether or not it needs to perform state migration;
 			// make sure that reading and writing each state partition works with the new serializer
 			Iterator<TestType> iterator = state.get().iterator();
 			Assert.assertEquals(new TestType("foo", 13), iterator.next());
@@ -471,18 +582,69 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 		}
 	}
 
+	// -------------------------------------------------------------------------------
+	//  Operator state backend union list state tests
+	// -------------------------------------------------------------------------------
+
+	@Test
+	public void testOperatorUnionListStateMigration() throws Exception {
+		final String stateName = "union-list-state";
+
+		testOperatorUnionListStateUpgrade(
+			new ListStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ListStateDescriptor<>(
+				stateName,
+				// restore with a V2 serializer that has a different schema
+				new TestType.V2TestTypeSerializer()));
+	}
+
 	@Test
-	public void testUnionListStateMigration() throws Exception {
+	public void testOperatorUnionListStateSerializerReconfiguration() throws Exception {
+		final String stateName = "union-list-state";
+
+		testOperatorUnionListStateUpgrade(
+			new ListStateDescriptor<>(
+				stateName,
+				new TestType.V1TestTypeSerializer()),
+			new ListStateDescriptor<>(
+				stateName,
+				// restore with a new serializer that requires reconfiguration
+				new TestType.ReconfigurationRequiringTestTypeSerializer()));
+	}
+
+
+	@Test
+	public void testOperatorUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
+		final String stateName = "union-list-state";
+
+		try {
+			testOperatorUnionListStateUpgrade(
+				new ListStateDescriptor<>(
+					stateName,
+					new TestType.V1TestTypeSerializer()),
+				new ListStateDescriptor<>(
+					stateName,
+					// restore with a new incompatible serializer
+					new TestType.IncompatibleTestTypeSerializer()));
+
+			Assert.fail("should have failed.");
+		} catch (Exception e) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
+		}
+	}
+
+	private void testOperatorUnionListStateUpgrade(
+			ListStateDescriptor<TestType> initialAccessDescriptor,
+			ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 
 		OperatorStateBackend backend = createOperatorStateBackend();
 
-		final String stateName = "union-list-state";
 		try {
-			ListStateDescriptor<TestType> descriptor = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer());
-			ListState<TestType> state = backend.getUnionListState(descriptor);
+			ListState<TestType> state = backend.getUnionListState(initialAccessDescriptor);
 
 			state.add(new TestType("foo", 13));
 			state.add(new TestType("bar", 278));
@@ -493,10 +655,7 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			backend = restoreOperatorStateBackend(snapshot);
 
-			descriptor = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V2TestTypeSerializer());
-			state = backend.getUnionListState(descriptor);
+			state = backend.getUnionListState(newAccessDescriptorAfterRestore);
 
 			// the state backend should have decided whether or not it needs to perform state migration;
 			// make sure that reading and writing each state partition works with the new serializer
@@ -510,171 +669,128 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 		}
 	}
 
+	// -------------------------------------------------------------------------------
+	//  Operator state backend broadcast state tests
+	// -------------------------------------------------------------------------------
+
 	@Test
 	public void testBroadcastStateValueMigration() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-
-		OperatorStateBackend backend = createOperatorStateBackend();
-
 		final String stateName = "broadcast-state";
-		try {
-			MapStateDescriptor<Integer, TestType> descriptor = new MapStateDescriptor<>(
+
+		testBroadcastStateValueUpgrade(
+			new MapStateDescriptor<>(
 				stateName,
 				IntSerializer.INSTANCE,
-				new TestType.V1TestTypeSerializer());
-			BroadcastState<Integer, TestType> state = backend.getBroadcastState(descriptor);
-
-			state.put(3, new TestType("foo", 13));
-			state.put(5, new TestType("bar", 278));
-
-			OperatorStateHandle snapshot = runSnapshot(
-				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
-			backend.dispose();
-
-			backend = restoreOperatorStateBackend(snapshot);
-
-			descriptor = new MapStateDescriptor<>(
+				new TestType.V1TestTypeSerializer()),
+			new MapStateDescriptor<>(
 				stateName,
 				IntSerializer.INSTANCE,
-				new TestType.V2TestTypeSerializer());
-			state = backend.getBroadcastState(descriptor);
-
-			// the state backend should have decided whether or not it needs to perform state migration;
-			// make sure that reading and writing each broadcast entry works with the new serializer
-			Assert.assertEquals(new TestType("foo", 13), state.get(3));
-			Assert.assertEquals(new TestType("bar", 278), state.get(5));
-			state.put(17, new TestType("new-entry", 777));
-		} finally {
-			backend.dispose();
-		}
+				// new value serializer is a V2 serializer with a different schema
+				new TestType.V2TestTypeSerializer()));
 	}
 
+
 	@Test
 	public void testBroadcastStateKeyMigration() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-
-		OperatorStateBackend backend = createOperatorStateBackend();
-
 		final String stateName = "broadcast-state";
-		try {
-			MapStateDescriptor<TestType, Integer> descriptor = new MapStateDescriptor<>(
+
+		testBroadcastStateKeyUpgrade(
+			new MapStateDescriptor<>(
 				stateName,
 				new TestType.V1TestTypeSerializer(),
-				IntSerializer.INSTANCE);
-			BroadcastState<TestType, Integer> state = backend.getBroadcastState(descriptor);
-
-			state.put(new TestType("foo", 13), 3);
-			state.put(new TestType("bar", 278), 5);
-
-			OperatorStateHandle snapshot = runSnapshot(
-				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
-			backend.dispose();
-
-			backend = restoreOperatorStateBackend(snapshot);
-
-			descriptor = new MapStateDescriptor<>(
+				IntSerializer.INSTANCE),
+			new MapStateDescriptor<>(
 				stateName,
+				// new key serializer is a V2 serializer with a different schema
 				new TestType.V2TestTypeSerializer(),
-				IntSerializer.INSTANCE);
-			state = backend.getBroadcastState(descriptor);
-
-			// the state backend should have decided whether or not it needs to perform state migration;
-			// make sure that reading and writing each broadcast entry works with the new serializer
-			Assert.assertEquals((Integer) 3, state.get(new TestType("foo", 13)));
-			Assert.assertEquals((Integer) 5, state.get(new TestType("bar", 278)));
-			state.put(new TestType("new-entry", 777), 17);
-		} finally {
-			backend.dispose();
-		}
+				IntSerializer.INSTANCE));
 	}
 
 	@Test
-	public void testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-
-		OperatorStateBackend backend = createOperatorStateBackend();
+	public void testBroadcastStateValueSerializerReconfiguration() throws Exception {
+		final String stateName = "broadcast-state";
 
-		final String stateName = "partitionable-list-state";
-		try {
-			ListStateDescriptor<TestType> descriptor = new ListStateDescriptor<>(
+		testBroadcastStateValueUpgrade(
+			new MapStateDescriptor<>(
 				stateName,
-				new TestType.V1TestTypeSerializer());
-			ListState<TestType> state = backend.getListState(descriptor);
-
-			state.add(new TestType("foo", 13));
-			state.add(new TestType("bar", 278));
-
-			OperatorStateHandle snapshot = runSnapshot(
-				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
-			backend.dispose();
+				IntSerializer.INSTANCE,
+				new TestType.V1TestTypeSerializer()),
+			new MapStateDescriptor<>(
+				stateName,
+				IntSerializer.INSTANCE,
+				// new value serializer is a new serializer that requires reconfiguration
+				new TestType.ReconfigurationRequiringTestTypeSerializer()));
+	}
 
-			backend = restoreOperatorStateBackend(snapshot);
+	@Test
+	public void testBroadcastStateKeySerializerReconfiguration() throws Exception {
+		final String stateName = "broadcast-state";
 
-			descriptor = new ListStateDescriptor<>(
+		testBroadcastStateKeyUpgrade(
+			new MapStateDescriptor<>(
 				stateName,
-				new TestType.IncompatibleTestTypeSerializer());
+				new TestType.V1TestTypeSerializer(),
+				IntSerializer.INSTANCE),
+			new MapStateDescriptor<>(
+				stateName,
+				// new key serializer is a new serializer that requires reconfiguration
+				new TestType.ReconfigurationRequiringTestTypeSerializer(),
+				IntSerializer.INSTANCE));
+	}
 
-			// the new serializer is INCOMPATIBLE, so registering the state should fail
-			backend.getListState(descriptor);
+	@Test
+	public void testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() throws Exception {
+		final String stateName = "broadcast-state";
+
+		try {
+			testBroadcastStateValueUpgrade(
+				new MapStateDescriptor<>(
+					stateName,
+					IntSerializer.INSTANCE,
+					new TestType.V1TestTypeSerializer()),
+				new MapStateDescriptor<>(
+					stateName,
+					IntSerializer.INSTANCE,
+					// new value serializer is incompatible
+					new TestType.IncompatibleTestTypeSerializer()));
 
 			Assert.fail("should have failed.");
 		} catch (Exception e) {
 			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-		} finally {
-			backend.dispose();
 		}
 	}
 
 	@Test
-	public void testUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-
-		OperatorStateBackend backend = createOperatorStateBackend();
+	public void testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() throws Exception {
+		final String stateName = "broadcast-state";
 
-		final String stateName = "union-list-state";
 		try {
-			ListStateDescriptor<TestType> descriptor = new ListStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer());
-			ListState<TestType> state = backend.getUnionListState(descriptor);
-
-			state.add(new TestType("foo", 13));
-			state.add(new TestType("bar", 278));
-
-			OperatorStateHandle snapshot = runSnapshot(
-				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
-			backend.dispose();
-
-			backend = restoreOperatorStateBackend(snapshot);
-
-			descriptor = new ListStateDescriptor<>(
-				stateName,
-				new TestType.IncompatibleTestTypeSerializer());
-
-			// the new serializer is INCOMPATIBLE, so registering the state should fail
-			backend.getUnionListState(descriptor);
+			testBroadcastStateKeyUpgrade(
+				new MapStateDescriptor<>(
+					stateName,
+					new TestType.V1TestTypeSerializer(),
+					IntSerializer.INSTANCE),
+				new MapStateDescriptor<>(
+					stateName,
+					// new key serializer is incompatible
+					new TestType.IncompatibleTestTypeSerializer(),
+					IntSerializer.INSTANCE));
 
 			Assert.fail("should have failed.");
 		} catch (Exception e) {
 			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-		} finally {
-			backend.dispose();
 		}
 	}
 
-	@Test
-	public void testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() throws Exception {
+	private void testBroadcastStateValueUpgrade(
+			MapStateDescriptor<Integer, TestType> initialAccessDescriptor,
+			MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore) throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 
 		OperatorStateBackend backend = createOperatorStateBackend();
 
-		final String stateName = "broadcast-state";
 		try {
-			MapStateDescriptor<Integer, TestType> descriptor = new MapStateDescriptor<>(
-				stateName,
-				IntSerializer.INSTANCE,
-				new TestType.V1TestTypeSerializer());
-			BroadcastState<Integer, TestType> state = backend.getBroadcastState(descriptor);
+			BroadcastState<Integer, TestType> state = backend.getBroadcastState(initialAccessDescriptor);
 
 			state.put(3, new TestType("foo", 13));
 			state.put(5, new TestType("bar", 278));
@@ -685,35 +801,28 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			backend = restoreOperatorStateBackend(snapshot);
 
-			descriptor = new MapStateDescriptor<>(
-				stateName,
-				IntSerializer.INSTANCE,
-				new TestType.IncompatibleTestTypeSerializer());
+			state = backend.getBroadcastState(newAccessDescriptorAfterRestore);
 
-			// the new value serializer is INCOMPATIBLE, so registering the state should fail
-			backend.getBroadcastState(descriptor);
-
-			Assert.fail("should have failed.");
-		} catch (Exception e) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
+			// the state backend should have decided whether or not it needs to perform state migration;
+			// make sure that reading and writing each broadcast entry works with the new serializer
+			Assert.assertEquals(new TestType("foo", 13), state.get(3));
+			Assert.assertEquals(new TestType("bar", 278), state.get(5));
+			state.put(17, new TestType("new-entry", 777));
 		} finally {
 			backend.dispose();
 		}
 	}
 
-	@Test
-	public void testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() throws Exception {
+	private void testBroadcastStateKeyUpgrade(
+			MapStateDescriptor<TestType, Integer> initialAccessDescriptor,
+			MapStateDescriptor<TestType, Integer> newAccessDescriptorAfterRestore) throws Exception {
+
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 
 		OperatorStateBackend backend = createOperatorStateBackend();
 
-		final String stateName = "broadcast-state";
 		try {
-			MapStateDescriptor<TestType, Integer> descriptor = new MapStateDescriptor<>(
-				stateName,
-				new TestType.V1TestTypeSerializer(),
-				IntSerializer.INSTANCE);
-			BroadcastState<TestType, Integer> state = backend.getBroadcastState(descriptor);
+			BroadcastState<TestType, Integer> state = backend.getBroadcastState(initialAccessDescriptor);
 
 			state.put(new TestType("foo", 13), 3);
 			state.put(new TestType("bar", 278), 5);
@@ -724,17 +833,13 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			backend = restoreOperatorStateBackend(snapshot);
 
-			descriptor = new MapStateDescriptor<>(
-				stateName,
-				new TestType.IncompatibleTestTypeSerializer(),
-				IntSerializer.INSTANCE);
-
-			// the new key serializer is INCOMPATIBLE, so registering the state should fail
-			backend.getBroadcastState(descriptor);
+			state = backend.getBroadcastState(newAccessDescriptorAfterRestore);
 
-			Assert.fail("should have failed.");
-		} catch (Exception e) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
+			// the state backend should have decided whether or not it needs to perform state migration;
+			// make sure that reading and writing each broadcast entry works with the new serializer
+			Assert.assertEquals((Integer) 3, state.get(new TestType("foo", 13)));
+			Assert.assertEquals((Integer) 5, state.get(new TestType("bar", 278)));
+			state.put(new TestType("new-entry", 777), 17);
 		} finally {
 			backend.dispose();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
index de1f2bd..91ebc95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
@@ -42,15 +42,15 @@ public class StateSerializerProviderTest {
 	// --------------------------------------------------------------------------------
 
 	@Test
-	public void testCurrentSchemaSerializerForNewStateSerializerProvider() {
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+	public void testCurrentSchemaSerializerForEagerlyRegisteredStateSerializerProvider() {
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.V1TestTypeSerializer());
 		assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
 	}
 
 	@Test
-	public void testCurrentSchemaSerializerForRestoredStateSerializerProvider() {
+	public void testCurrentSchemaSerializerForLazilyRegisteredStateSerializerProvider() {
 		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 		assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
 	}
 
@@ -59,17 +59,17 @@ public class StateSerializerProviderTest {
 	// --------------------------------------------------------------------------------
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testPreviousSchemaSerializerForNewStateSerializerProvider() {
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+	public void testPreviousSchemaSerializerForEagerlyRegisteredStateSerializerProvider() {
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.V1TestTypeSerializer());
 
 		// this should fail with an exception
 		testProvider.previousSchemaSerializer();
 	}
 
 	@Test
-	public void testPreviousSchemaSerializerForRestoredStateSerializerProvider() {
+	public void testPreviousSchemaSerializerForLazilyRegisteredStateSerializerProvider() {
 		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 		assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
 	}
 
@@ -78,7 +78,7 @@ public class StateSerializerProviderTest {
 		// create the provider with an exception throwing snapshot;
 		// this would throw an exception if the restore serializer was eagerly accessed
 		StateSerializerProvider<String> testProvider =
-			StateSerializerProvider.fromRestoredState(new ExceptionThrowingSerializerSnapshot());
+			StateSerializerProvider.fromPreviousSerializerSnapshot(new ExceptionThrowingSerializerSnapshot());
 
 		try {
 			// if we fail here, that means the restore serializer was indeed lazily accessed
@@ -94,15 +94,15 @@ public class StateSerializerProviderTest {
 	// --------------------------------------------------------------------------------
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testRegisterNewSerializerWithNewStateSerializerProviderShouldFail() {
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+	public void testRegisterNewSerializerWithEagerlyRegisteredStateSerializerProviderShouldFail() {
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.V1TestTypeSerializer());
 		testProvider.registerNewSerializerForRestoredState(new TestType.V2TestTypeSerializer());
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFail() {
+	public void testRegisterNewSerializerTwiceWithLazilyRegisteredStateSerializerProviderShouldFail() {
 		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
 		testProvider.registerNewSerializerForRestoredState(new TestType.V2TestTypeSerializer());
 
@@ -111,9 +111,9 @@ public class StateSerializerProviderTest {
 	}
 
 	@Test
-	public void testRegisterNewCompatibleAsIsSerializer() {
+	public void testLazilyRegisterNewCompatibleAsIsSerializer() {
 		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
 		// register compatible serializer for state
 		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
@@ -125,20 +125,36 @@ public class StateSerializerProviderTest {
 	}
 
 	@Test
-	public void testRegisterNewCompatibleAfterMigrationSerializer() {
+	public void testLazilyRegisterNewCompatibleAfterMigrationSerializer() {
 		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
 		// register serializer that requires migration for state
 		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
 			testProvider.registerNewSerializerForRestoredState(new TestType.V2TestTypeSerializer());
 		assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+
+		assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V2TestTypeSerializer);
+		assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
+	}
+
+	@Test
+	public void testLazilyRegisterNewSerializerRequiringReconfiguration() {
+		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
+
+		// register serializer that requires reconfiguration, and verify that
+		// the resulting current schema serializer is the reconfigured one
+		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
+			testProvider.registerNewSerializerForRestoredState(new TestType.ReconfigurationRequiringTestTypeSerializer());
+		assertTrue(schemaCompatibility.isCompatibleWithReconfiguredSerializer());
+		assertTrue(testProvider.currentSchemaSerializer().getClass() == TestType.V1TestTypeSerializer.class);
 	}
 
 	@Test
-	public void testRegisterIncompatibleSerializer() {
+	public void testLazilyRegisterIncompatibleSerializer() {
 		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
-		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
 		// register serializer that requires migration for state
 		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
@@ -148,6 +164,88 @@ public class StateSerializerProviderTest {
 		try {
 			// a serializer for the current schema will no longer be accessible
 			testProvider.currentSchemaSerializer();
+
+			fail();
+		} catch (Exception excepted) {
+			// success
+		}
+	}
+
+	// --------------------------------------------------------------------------------
+	//  Tests for #setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
+	// --------------------------------------------------------------------------------
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSerializerSnapshotWithLazilyRegisteredSerializerProviderShouldFail() {
+		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
+
+		testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSerializerSnapshotTwiceWithEagerlyRegisteredSerializerProviderShouldFail() {
+		TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(serializer);
+
+		testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+
+		// second registration should fail
+		testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+	}
+
+	@Test
+	public void testEagerlyRegisterNewCompatibleAsIsSerializer() {
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.V1TestTypeSerializer());
+
+		// set previous serializer snapshot for state, which should let the new serializer be considered compatible as is
+		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
+			testProvider.setPreviousSerializerSnapshotForRestoredState(new TestType.V1TestTypeSerializer().snapshotConfiguration());
+		assertTrue(schemaCompatibility.isCompatibleAsIs());
+
+		assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
+		assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
+	}
+
+	@Test
+	public void testEagerlyRegisterCompatibleAfterMigrationSerializer() {
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.V2TestTypeSerializer());
+
+		// set previous serializer snapshot for state, which should let the new serializer be considered compatible after migration
+		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
+			testProvider.setPreviousSerializerSnapshotForRestoredState(new TestType.V1TestTypeSerializer().snapshotConfiguration());
+		assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+
+		assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V2TestTypeSerializer);
+		assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
+	}
+
+	@Test
+	public void testEagerlyRegisterNewSerializerRequiringReconfiguration() {
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.ReconfigurationRequiringTestTypeSerializer());
+
+		// set previous serializer snapshot, which should let the new serializer be considered to require reconfiguration,
+		// and verify that the resulting current schema serializer is the reconfigured one
+		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
+			testProvider.setPreviousSerializerSnapshotForRestoredState(new TestType.V1TestTypeSerializer().snapshotConfiguration());
+		assertTrue(schemaCompatibility.isCompatibleWithReconfiguredSerializer());
+		assertTrue(testProvider.currentSchemaSerializer().getClass() == TestType.V1TestTypeSerializer.class);
+	}
+
+	@Test
+	public void testEagerlyRegisterIncompatibleSerializer() {
+		StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.IncompatibleTestTypeSerializer());
+
+		// set previous serializer snapshot for state, which should let the new serializer be considered incompatible
+		TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
+			testProvider.setPreviousSerializerSnapshotForRestoredState(new TestType.V1TestTypeSerializer().snapshotConfiguration());
+		assertTrue(schemaCompatibility.isIncompatible());
+
+		try {
+			// a serializer for the current schema will no longer be accessible
+			testProvider.currentSchemaSerializer();
+
+			fail();
 		} catch (Exception excepted) {
 			// success
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
index e3b0a06..52d5521 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
@@ -149,6 +149,29 @@ public class TestType implements HeapPriorityQueueElement, PriorityComparable<Te
 	}
 
 	/**
+	 * A serializer that is meant to be compatible with any of the serializers only ofter being reconfigured as a new instance.
+	 */
+	public static class ReconfigurationRequiringTestTypeSerializer extends TestTypeSerializerBase {
+
+		private static final long serialVersionUID = -7254527815207212324L;
+
+		@Override
+		public void serialize(TestType record, DataOutputView target) throws IOException {
+			throw new UnsupportedOperationException("The serializer should have been reconfigured as a new instance; shouldn't be used.");
+		}
+
+		@Override
+		public TestType deserialize(DataInputView source) throws IOException {
+			throw new UnsupportedOperationException("The serializer should have been reconfigured as a new instance; shouldn't be used.");
+		}
+
+		@Override
+		public TypeSerializerSnapshot<TestType> snapshotConfiguration() {
+			throw new UnsupportedOperationException("The serializer should have been reconfigured as a new instance; shouldn't be used.");
+		}
+	}
+
+	/**
 	 * A serializer that is meant to be incompatible with any of the serializers.
 	 */
 	public static class IncompatibleTestTypeSerializer extends TestTypeSerializerBase {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
index b2b802a..9f78ce3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
@@ -42,8 +42,13 @@ public class V1TestTypeSerializerSnapshot implements TypeSerializerSnapshot<Test
 			return TypeSerializerSchemaCompatibility.compatibleAsIs();
 		} else if (newSerializer instanceof TestType.V2TestTypeSerializer) {
 			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
-		} else {
+		} else if (newSerializer instanceof TestType.ReconfigurationRequiringTestTypeSerializer) {
+			// we mimic the reconfiguration by just re-instantiating the correct serializer
+			return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new TestType.V1TestTypeSerializer());
+		} else if (newSerializer instanceof TestType.IncompatibleTestTypeSerializer) {
 			return TypeSerializerSchemaCompatibility.incompatible();
+		} else {
+			throw new IllegalStateException("Unknown serializer class for TestType.");
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
index 3cd4fff..983a2ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
@@ -40,8 +40,17 @@ public class V2TestTypeSerializerSnapshot implements TypeSerializerSnapshot<Test
 	public TypeSerializerSchemaCompatibility<TestType> resolveSchemaCompatibility(TypeSerializer<TestType> newSerializer) {
 		if (newSerializer instanceof TestType.V2TestTypeSerializer) {
 			return TypeSerializerSchemaCompatibility.compatibleAsIs();
-		} else {
+		} else if (newSerializer instanceof TestType.ReconfigurationRequiringTestTypeSerializer) {
+			// we mimic the reconfiguration by just re-instantiating the correct serializer
+			return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new TestType.V2TestTypeSerializer());
+		} else if (
+			// migrating from V2 -> V1 is not supported
+			newSerializer instanceof TestType.V1TestTypeSerializer
+				|| newSerializer instanceof TestType.IncompatibleTestTypeSerializer) {
+
 			return TypeSerializerSchemaCompatibility.incompatible();
+		} else {
+			throw new IllegalStateException("Unknown serializer class for TestType.");
 		}
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6b00393..9335762 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -340,7 +340,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
 		final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
-		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
+		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
 		final byte[] nameSpaceBytes;
 		try {
 			RocksDBKeySerializationUtils.writeNameSpace(
@@ -356,7 +356,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
 		iterator.seekToFirst();
 
-		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
 			ambiguousKeyPossible, nameSpaceBytes);
 
 		Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
@@ -550,7 +550,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			new RocksFullSnapshotStrategy<>(
 				db,
 				rocksDBResourceGuard,
-				keySerializer,
+				getKeySerializer(),
 				kvStateInformation,
 				keyGroupRange,
 				keyGroupPrefixBytes,
@@ -577,7 +577,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			this.checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>(
 				db,
 				rocksDBResourceGuard,
-				keySerializer,
+				getKeySerializer(),
 				kvStateInformation,
 				keyGroupRange,
 				keyGroupPrefixBytes,
@@ -669,6 +669,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		/** The compression decorator that was used for writing the state, as determined by the meta data. */
 		private StreamCompressionDecorator keygroupStreamCompressionDecorator;
 
+		private boolean isKeySerializerCompatibilityChecked;
+
 		/**
 		 * Creates a restore operation object for the given state backend instance.
 		 *
@@ -734,11 +736,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			serializationProxy.read(currentStateHandleInView);
 
-			// check for key serializer compatibility; this also reconfigures the
-			// key serializer to be compatible, if it is required and is possible
-			if (!serializationProxy.getKeySerializerConfigSnapshot()
-					.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySerializer).isCompatibleAsIs()) {
-				throw new StateMigrationException("The new key serializer must be compatible.");
+			if (!isKeySerializerCompatibilityChecked) {
+				// check for key serializer compatibility; this also reconfigures the
+				// key serializer to be compatible, if it is required and is possible
+				TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
+					rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerConfigSnapshot());
+				if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
+					throw new StateMigrationException("The new key serializer must be compatible.");
+				}
+
+				isKeySerializerCompatibilityChecked = true;
 			}
 
 			this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
@@ -835,6 +842,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
 		private UUID restoredBackendUID;
 		private long lastCompletedCheckpointId;
+		private boolean isKeySerializerCompatibilityChecked;
 
 		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
 
@@ -1279,11 +1287,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				DataInputView in = new DataInputViewStreamWrapper(inputStream);
 				serializationProxy.read(in);
 
-				// check for key serializer compatibility; this also reconfigures the
-				// key serializer to be compatible, if it is required and is possible
-				if (!serializationProxy.getKeySerializerConfigSnapshot()
-						.resolveSchemaCompatibility(stateBackend.keySerializer).isCompatibleAsIs()) {
-					throw new StateMigrationException("The new key serializer must be compatible.");
+				if (!isKeySerializerCompatibilityChecked) {
+					// check for key serializer compatibility; this also reconfigures the
+					// key serializer to be compatible, if it is required and is possible
+					TypeSerializerSchemaCompatibility<T> keySerializerSchemaCompat =
+						stateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerConfigSnapshot());
+					if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
+						throw new StateMigrationException("The new key serializer must be compatible.");
+					}
+
+					isKeySerializerCompatibilityChecked = true;
 				}
 
 				return serializationProxy.getStateMetaInfoSnapshots();
@@ -1360,7 +1373,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		restoredKvStateMetaInfo.updateSnapshotTransformer(snapshotTransformer);
 
 		TypeSerializerSchemaCompatibility<N> s = restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
-		if (!s.isCompatibleAsIs()) {
+		if (s.isCompatibleAfterMigration() || s.isIncompatible()) {
 			throw new StateMigrationException("The new namespace serializer must be compatible.");
 		}