You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 06:11:30 UTC
[4/4] flink git commit: [hotfix] Restore KeySerializer only once
[hotfix] Restore KeySerializer only once
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfb6a698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfb6a698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfb6a698
Branch: refs/heads/release-1.3
Commit: cfb6a6982cce89f76209d7c4bea4c9905fd5092a
Parents: 51fb7ed
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 16:41:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200
----------------------------------------------------------------------
.../state/heap/HeapKeyedStateBackend.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb6a698/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 8d3d8a0..6eb314b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int numRegisteredKvStates = 0;
stateTables.clear();
+ boolean keySerializerRestored = false;
+
for (KeyedStateHandle keyedStateHandle : state) {
if (keyedStateHandle == null) {
@@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.read(inView);
- // check for key serializer compatibility; this also reconfigures the
- // key serializer to be compatible, if it is required and is possible
- if (StateMigrationUtil.resolveCompatibilityResult(
+ if (!keySerializerRestored) {
+ // check for key serializer compatibility; this also reconfigures the
+ // key serializer to be compatible, if it is required and is possible
+ if (StateMigrationUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
(TypeSerializer) keySerializer)
- .isRequiresMigration()) {
+ .isRequiresMigration()) {
- // TODO replace with state migration; note that key hash codes need to remain the same after migration
- throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
- "Aborting now since state migration is currently not available");
- }
+ // TODO replace with state migration; note that key hash codes need to remain the same after migration
+ throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+ "Aborting now since state migration is currently not available");
+ }
+ keySerializerRestored = true;
+ }
+
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();