You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:22 UTC

[flink] 06/09: [FLINK-20978] Implement HeapSavepointRestoreOperation

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac493ebc882ee18a7c7e75d15ddcad695de1f51b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 13 17:24:03 2021 +0100

    [FLINK-20978] Implement HeapSavepointRestoreOperation
    
    This commit implements the logic of restoring a heap keyed state backend
    from a savepoint in a unified binary format. It eagerly deserializes all
    states and populates the in memory structures.
---
 .../state/heap/HeapMetaInfoRestoreOperation.java   | 133 +++++++++++
 .../runtime/state/heap/HeapRestoreOperation.java   |  94 ++------
 .../state/heap/HeapSavepointRestoreOperation.java  | 259 +++++++++++++++++++++
 3 files changed, 405 insertions(+), 81 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
new file mode 100644
index 0000000..aecc44b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A helper class shared between the {@link HeapRestoreOperation} and {@link
+ * HeapSavepointRestoreOperation} for restoring {@link StateMetaInfoSnapshot
+ * StateMetaInfoSnapshots}.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+class HeapMetaInfoRestoreOperation<K> {
+    private final StateSerializerProvider<K> keySerializerProvider;
+    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
+    @Nonnull private final KeyGroupRange keyGroupRange;
+    @Nonnegative private final int numberOfKeyGroups;
+    private final StateTableFactory<K> stateTableFactory;
+    private final InternalKeyContext<K> keyContext;
+
+    HeapMetaInfoRestoreOperation(
+            StateSerializerProvider<K> keySerializerProvider,
+            HeapPriorityQueueSetFactory priorityQueueSetFactory,
+            @Nonnull KeyGroupRange keyGroupRange,
+            int numberOfKeyGroups,
+            StateTableFactory<K> stateTableFactory,
+            InternalKeyContext<K> keyContext) {
+        this.keySerializerProvider = keySerializerProvider;
+        this.priorityQueueSetFactory = priorityQueueSetFactory;
+        this.keyGroupRange = keyGroupRange;
+        this.numberOfKeyGroups = numberOfKeyGroups;
+        this.stateTableFactory = stateTableFactory;
+        this.keyContext = keyContext;
+    }
+
+    Map<Integer, StateMetaInfoSnapshot> createOrCheckStateForMetaInfo(
+            List<StateMetaInfoSnapshot> restoredMetaInfo,
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
+
+        final Map<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<>();
+        for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
+            final StateSnapshotRestore registeredState;
+
+            switch (metaInfoSnapshot.getBackendStateType()) {
+                case KEY_VALUE:
+                    registeredState = registeredKVStates.get(metaInfoSnapshot.getName());
+                    if (registeredState == null) {
+                        RegisteredKeyValueStateBackendMetaInfo<?, ?>
+                                registeredKeyedBackendStateMetaInfo =
+                                        new RegisteredKeyValueStateBackendMetaInfo<>(
+                                                metaInfoSnapshot);
+                        registeredKVStates.put(
+                                metaInfoSnapshot.getName(),
+                                stateTableFactory.newStateTable(
+                                        keyContext,
+                                        registeredKeyedBackendStateMetaInfo,
+                                        keySerializerProvider.currentSchemaSerializer()));
+                    }
+                    break;
+                case PRIORITY_QUEUE:
+                    registeredState = registeredPQStates.get(metaInfoSnapshot.getName());
+                    if (registeredState == null) {
+                        registeredPQStates.put(
+                                metaInfoSnapshot.getName(),
+                                createInternal(
+                                        new RegisteredPriorityQueueStateBackendMetaInfo<>(
+                                                metaInfoSnapshot)));
+                    }
+                    break;
+                default:
+                    throw new IllegalStateException(
+                            "Unexpected state type: "
+                                    + metaInfoSnapshot.getBackendStateType()
+                                    + ".");
+            }
+
+            // always put metaInfo into kvStatesById, because kvStatesById is KeyGroupsStateHandle
+            // related
+            kvStatesById.put(kvStatesById.size(), metaInfoSnapshot);
+        }
+
+        return kvStatesById;
+    }
+
+    private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+            HeapPriorityQueueSnapshotRestoreWrapper<T> createInternal(
+                    RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
+
+        final String stateName = metaInfo.getName();
+        final HeapPriorityQueueSet<T> priorityQueue =
+                priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer());
+
+        return new HeapPriorityQueueSnapshotRestoreWrapper<>(
+                priorityQueue,
+                metaInfo,
+                KeyExtractorFunction.forKeyedObjects(),
+                keyGroupRange,
+                numberOfKeyGroups);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index 86a7491..e78751b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -24,16 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.PriorityComparable;
-import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RestoreOperation;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.StateSerializerProvider;
@@ -49,13 +44,11 @@ import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -75,11 +68,8 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
     private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
     private final CloseableRegistry cancelStreamRegistry;
-    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
     @Nonnull private final KeyGroupRange keyGroupRange;
-    @Nonnegative private final int numberOfKeyGroups;
-    private final StateTableFactory<K> stateTableFactory;
-    private final InternalKeyContext<K> keyContext;
+    private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation;
 
     HeapRestoreOperation(
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@@ -91,7 +81,7 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
             HeapPriorityQueueSetFactory priorityQueueSetFactory,
             @Nonnull KeyGroupRange keyGroupRange,
             int numberOfKeyGroups,
-            StateTableFactory<K> snapshotStrategy,
+            StateTableFactory<K> stateTableFactory,
             InternalKeyContext<K> keyContext) {
         this.restoreStateHandles = restoreStateHandles;
         this.keySerializerProvider = keySerializerProvider;
@@ -99,11 +89,15 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
         this.registeredKVStates = registeredKVStates;
         this.registeredPQStates = registeredPQStates;
         this.cancelStreamRegistry = cancelStreamRegistry;
-        this.priorityQueueSetFactory = priorityQueueSetFactory;
         this.keyGroupRange = keyGroupRange;
-        this.numberOfKeyGroups = numberOfKeyGroups;
-        this.stateTableFactory = snapshotStrategy;
-        this.keyContext = keyContext;
+        this.heapMetaInfoRestoreOperation =
+                new HeapMetaInfoRestoreOperation<>(
+                        keySerializerProvider,
+                        priorityQueueSetFactory,
+                        keyGroupRange,
+                        numberOfKeyGroups,
+                        stateTableFactory,
+                        keyContext);
     }
 
     @Override
@@ -165,9 +159,9 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
                 List<StateMetaInfoSnapshot> restoredMetaInfos =
                         serializationProxy.getStateMetaInfoSnapshots();
 
-                final Map<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<>();
-
-                createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById);
+                final Map<Integer, StateMetaInfoSnapshot> kvStatesById =
+                        this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo(
+                                restoredMetaInfos, registeredKVStates, registeredPQStates);
 
                 readStateHandleStateData(
                         fsDataInputStream,
@@ -187,68 +181,6 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
         return null;
     }
 
-    private void createOrCheckStateForMetaInfo(
-            List<StateMetaInfoSnapshot> restoredMetaInfo,
-            Map<Integer, StateMetaInfoSnapshot> kvStatesById) {
-
-        for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
-            final StateSnapshotRestore registeredState;
-
-            switch (metaInfoSnapshot.getBackendStateType()) {
-                case KEY_VALUE:
-                    registeredState = registeredKVStates.get(metaInfoSnapshot.getName());
-                    if (registeredState == null) {
-                        RegisteredKeyValueStateBackendMetaInfo<?, ?>
-                                registeredKeyedBackendStateMetaInfo =
-                                        new RegisteredKeyValueStateBackendMetaInfo<>(
-                                                metaInfoSnapshot);
-                        registeredKVStates.put(
-                                metaInfoSnapshot.getName(),
-                                stateTableFactory.newStateTable(
-                                        keyContext,
-                                        registeredKeyedBackendStateMetaInfo,
-                                        keySerializerProvider.currentSchemaSerializer()));
-                    }
-                    break;
-                case PRIORITY_QUEUE:
-                    registeredState = registeredPQStates.get(metaInfoSnapshot.getName());
-                    if (registeredState == null) {
-                        createInternal(
-                                new RegisteredPriorityQueueStateBackendMetaInfo<>(
-                                        metaInfoSnapshot));
-                    }
-                    break;
-                default:
-                    throw new IllegalStateException(
-                            "Unexpected state type: "
-                                    + metaInfoSnapshot.getBackendStateType()
-                                    + ".");
-            }
-
-            // always put metaInfo into kvStatesById, because kvStatesById is KeyGroupsStateHandle
-            // related
-            kvStatesById.put(kvStatesById.size(), metaInfoSnapshot);
-        }
-    }
-
-    private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> void createInternal(
-            RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
-
-        final String stateName = metaInfo.getName();
-        final HeapPriorityQueueSet<T> priorityQueue =
-                priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer());
-
-        HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper =
-                new HeapPriorityQueueSnapshotRestoreWrapper<>(
-                        priorityQueue,
-                        metaInfo,
-                        KeyExtractorFunction.forKeyedObjects(),
-                        keyGroupRange,
-                        numberOfKeyGroups);
-
-        registeredPQStates.put(stateName, wrapper);
-    }
-
     private void readStateHandleStateData(
             FSDataInputStream fsDataInputStream,
             DataInputViewStreamWrapper inView,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
new file mode 100644
index 0000000..13fdf8f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RestoreOperation;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation;
+import org.apache.flink.runtime.state.restore.KeyGroup;
+import org.apache.flink.runtime.state.restore.KeyGroupEntry;
+import org.apache.flink.runtime.state.restore.SavepointRestoreResult;
+import org.apache.flink.runtime.state.restore.ThrowingIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix;
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readKey;
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readKeyGroup;
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readNamespace;
+
+/**
+ * Implementation of heap savepoint restore operation. Savepoint shares a common unified binary
+ * format across all state backends.
+ *
+ * @param <K> The data type that the serializer serializes.
+ */
+public class HeapSavepointRestoreOperation<K> implements RestoreOperation<Void> {
+    private final int keyGroupPrefixBytes;
+    private final StateSerializerProvider<K> keySerializerProvider;
+    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
+    private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
+    private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation;
+    /*
+       Shared wrappers for deserializing an entry in the state handle. An optimization
+       to reduce the number of objects created.
+    */
+    private final DataInputDeserializer entryKeyDeserializer;
+    private final DataInputDeserializer entryValueDeserializer;
+    private final ListDelimitedSerializer listDelimitedSerializer;
+
+    HeapSavepointRestoreOperation(
+            @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
+            StateSerializerProvider<K> keySerializerProvider,
+            ClassLoader userCodeClassLoader,
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+            HeapPriorityQueueSetFactory priorityQueueSetFactory,
+            @Nonnull KeyGroupRange keyGroupRange,
+            int numberOfKeyGroups,
+            StateTableFactory<K> stateTableFactory,
+            InternalKeyContext<K> keyContext) {
+        this.keySerializerProvider = keySerializerProvider;
+        this.registeredKVStates = registeredKVStates;
+        this.registeredPQStates = registeredPQStates;
+        this.savepointRestoreOperation =
+                new FullSnapshotRestoreOperation<>(
+                        keyGroupRange,
+                        userCodeClassLoader,
+                        restoreStateHandles,
+                        keySerializerProvider);
+        this.keyGroupPrefixBytes = computeRequiredBytesInKeyGroupPrefix(numberOfKeyGroups);
+        this.heapMetaInfoRestoreOperation =
+                new HeapMetaInfoRestoreOperation<>(
+                        keySerializerProvider,
+                        priorityQueueSetFactory,
+                        keyGroupRange,
+                        numberOfKeyGroups,
+                        stateTableFactory,
+                        keyContext);
+        this.entryKeyDeserializer = new DataInputDeserializer();
+        this.entryValueDeserializer = new DataInputDeserializer();
+        this.listDelimitedSerializer = new ListDelimitedSerializer();
+    }
+
+    @Override
+    public Void restore() throws Exception {
+
+        registeredKVStates.clear();
+        registeredPQStates.clear();
+
+        try (ThrowingIterator<SavepointRestoreResult> restore =
+                this.savepointRestoreOperation.restore()) {
+            while (restore.hasNext()) {
+                SavepointRestoreResult restoreResult = restore.next();
+                List<StateMetaInfoSnapshot> restoredMetaInfos =
+                        restoreResult.getStateMetaInfoSnapshots();
+
+                final Map<Integer, StateMetaInfoSnapshot> kvStatesById =
+                        this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo(
+                                restoredMetaInfos, registeredKVStates, registeredPQStates);
+
+                try (ThrowingIterator<KeyGroup> keyGroups = restoreResult.getRestoredKeyGroups()) {
+                    while (keyGroups.hasNext()) {
+                        readKeyGroupStateData(
+                                keyGroups.next(),
+                                keySerializerProvider.previousSchemaSerializer(),
+                                kvStatesById);
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    private void readKeyGroupStateData(
+            KeyGroup keyGroup,
+            TypeSerializer<K> keySerializer,
+            Map<Integer, StateMetaInfoSnapshot> kvStatesById)
+            throws Exception {
+
+        try (ThrowingIterator<KeyGroupEntry> entries = keyGroup.getKeyGroupEntries()) {
+            while (entries.hasNext()) {
+                KeyGroupEntry groupEntry = entries.next();
+                StateMetaInfoSnapshot infoSnapshot = kvStatesById.get(groupEntry.getKvStateId());
+                switch (infoSnapshot.getBackendStateType()) {
+                    case KEY_VALUE:
+                        readKVStateData(keySerializer, groupEntry, infoSnapshot);
+                        break;
+                    case PRIORITY_QUEUE:
+                        readPriorityQueue(groupEntry, infoSnapshot);
+                        break;
+                    case OPERATOR:
+                    case BROADCAST:
+                        throw new IllegalStateException(
+                                "Expected only keyed state. Received: "
+                                        + infoSnapshot.getBackendStateType());
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void readPriorityQueue(KeyGroupEntry groupEntry, StateMetaInfoSnapshot infoSnapshot)
+            throws IOException {
+        DataInputDeserializer keyDeserializer = new DataInputDeserializer(groupEntry.getKey());
+        keyDeserializer.skipBytesToRead(keyGroupPrefixBytes);
+        HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>
+                priorityQueueSnapshotRestoreWrapper =
+                        (HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>)
+                                registeredPQStates.get(infoSnapshot.getName());
+        HeapPriorityQueueElement timer =
+                priorityQueueSnapshotRestoreWrapper
+                        .getMetaInfo()
+                        .getElementSerializer()
+                        .deserialize(keyDeserializer);
+        HeapPriorityQueueSet<HeapPriorityQueueElement> priorityQueue =
+                priorityQueueSnapshotRestoreWrapper.getPriorityQueue();
+        priorityQueue.add(timer);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void readKVStateData(
+            TypeSerializer<K> keySerializer,
+            KeyGroupEntry groupEntry,
+            StateMetaInfoSnapshot infoSnapshot)
+            throws IOException {
+        StateTable<K, Object, Object> stateTable =
+                (StateTable<K, Object, Object>) registeredKVStates.get(infoSnapshot.getName());
+        RegisteredKeyValueStateBackendMetaInfo<?, ?> metaInfo = stateTable.getMetaInfo();
+        TypeSerializer<?> namespaceSerializer = metaInfo.getPreviousNamespaceSerializer();
+        TypeSerializer<?> stateSerializer = metaInfo.getPreviousStateSerializer();
+        boolean isAmbigousKey =
+                keySerializer.getLength() < 0 && namespaceSerializer.getLength() < 0;
+        entryKeyDeserializer.setBuffer(groupEntry.getKey());
+        entryValueDeserializer.setBuffer(groupEntry.getValue());
+        int keyGroup = readKeyGroup(keyGroupPrefixBytes, entryKeyDeserializer);
+        K key = readKey(keySerializer, entryKeyDeserializer, isAmbigousKey);
+        Object namespace = readNamespace(namespaceSerializer, entryKeyDeserializer, isAmbigousKey);
+        switch (metaInfo.getStateType()) {
+            case LIST:
+                stateTable.put(
+                        key,
+                        keyGroup,
+                        namespace,
+                        listDelimitedSerializer.deserializeList(
+                                groupEntry.getValue(),
+                                ((ListSerializer<?>) stateSerializer).getElementSerializer()));
+                break;
+            case VALUE:
+            case REDUCING:
+            case FOLDING:
+            case AGGREGATING:
+                stateTable.put(
+                        key,
+                        keyGroup,
+                        namespace,
+                        stateSerializer.deserialize(entryValueDeserializer));
+                break;
+            case MAP:
+                deserializeMapStateEntry(
+                        (StateTable<K, Object, Map<Object, Object>>)
+                                (StateTable<K, ?, ?>) stateTable,
+                        keyGroup,
+                        key,
+                        namespace,
+                        (MapSerializer<Object, Object>) stateSerializer);
+                break;
+            default:
+                throw new IllegalStateException("Unknown state type: " + metaInfo.getStateType());
+        }
+    }
+
+    private void deserializeMapStateEntry(
+            StateTable<K, Object, Map<Object, Object>> stateTable,
+            int keyGroup,
+            K key,
+            Object namespace,
+            MapSerializer<Object, Object> stateSerializer)
+            throws IOException {
+        Object mapEntryKey = stateSerializer.getKeySerializer().deserialize(entryKeyDeserializer);
+        boolean isNull = entryValueDeserializer.readBoolean();
+        final Object mapEntryValue;
+        if (isNull) {
+            mapEntryValue = null;
+        } else {
+            mapEntryValue =
+                    stateSerializer.getValueSerializer().deserialize(entryValueDeserializer);
+        }
+
+        Map<Object, Object> userMap = stateTable.get(key, namespace);
+        if (userMap == null) {
+            userMap = new HashMap<>();
+            stateTable.put(key, keyGroup, namespace, userMap);
+        }
+        userMap.put(mapEntryKey, mapEntryValue);
+    }
+}