You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 06:11:30 UTC

[4/4] flink git commit: [hotfix] Restore KeySerializer only once

[hotfix] Restore KeySerializer only once


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfb6a698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfb6a698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfb6a698

Branch: refs/heads/release-1.3
Commit: cfb6a6982cce89f76209d7c4bea4c9905fd5092a
Parents: 51fb7ed
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 16:41:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfb6a698/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 8d3d8a0..6eb314b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		int numRegisteredKvStates = 0;
 		stateTables.clear();
 
+		boolean keySerializerRestored = false;
+
 		for (KeyedStateHandle keyedStateHandle : state) {
 
 			if (keyedStateHandle == null) {
@@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				serializationProxy.read(inView);
 
-				// check for key serializer compatibility; this also reconfigures the
-				// key serializer to be compatible, if it is required and is possible
-				if (StateMigrationUtil.resolveCompatibilityResult(
+				if (!keySerializerRestored) {
+					// check for key serializer compatibility; this also reconfigures the
+					// key serializer to be compatible, if it is required and is possible
+					if (StateMigrationUtil.resolveCompatibilityResult(
 						serializationProxy.getKeySerializer(),
 						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 						serializationProxy.getKeySerializerConfigSnapshot(),
 						(TypeSerializer) keySerializer)
-					.isRequiresMigration()) {
+						.isRequiresMigration()) {
 
-					// TODO replace with state migration; note that key hash codes need to remain the same after migration
-					throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
-						"Aborting now since state migration is currently not available");
-				}
+						// TODO replace with state migration; note that key hash codes need to remain the same after migration
+						throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+							"Aborting now since state migration is currently not available");
+					}
 
+					keySerializerRestored = true;
+				}
+				
 				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
 						serializationProxy.getStateMetaInfoSnapshots();