You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:22 UTC
[flink] 06/09: [FLINK-20978] Implement HeapSavepointRestoreOperation
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac493ebc882ee18a7c7e75d15ddcad695de1f51b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 13 17:24:03 2021 +0100
[FLINK-20978] Implement HeapSavepointRestoreOperation
This commit implements the logic of restoring a heap keyed state backend
from a savepoint in a unified binary format. It eagerly deserializes all
states and populates the in memory structures.
---
.../state/heap/HeapMetaInfoRestoreOperation.java | 133 +++++++++++
.../runtime/state/heap/HeapRestoreOperation.java | 94 ++------
.../state/heap/HeapSavepointRestoreOperation.java | 259 +++++++++++++++++++++
3 files changed, 405 insertions(+), 81 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
new file mode 100644
index 0000000..aecc44b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A helper class shared between the {@link HeapRestoreOperation} and {@link
+ * HeapSavepointRestoreOperation} for restoring {@link StateMetaInfoSnapshot
+ * StateMetaInfoSnapshots}.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+class HeapMetaInfoRestoreOperation<K> {
+ private final StateSerializerProvider<K> keySerializerProvider;
+ private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
+ @Nonnull private final KeyGroupRange keyGroupRange;
+ @Nonnegative private final int numberOfKeyGroups;
+ private final StateTableFactory<K> stateTableFactory;
+ private final InternalKeyContext<K> keyContext;
+
+ HeapMetaInfoRestoreOperation(
+ StateSerializerProvider<K> keySerializerProvider,
+ HeapPriorityQueueSetFactory priorityQueueSetFactory,
+ @Nonnull KeyGroupRange keyGroupRange,
+ int numberOfKeyGroups,
+ StateTableFactory<K> stateTableFactory,
+ InternalKeyContext<K> keyContext) {
+ this.keySerializerProvider = keySerializerProvider;
+ this.priorityQueueSetFactory = priorityQueueSetFactory;
+ this.keyGroupRange = keyGroupRange;
+ this.numberOfKeyGroups = numberOfKeyGroups;
+ this.stateTableFactory = stateTableFactory;
+ this.keyContext = keyContext;
+ }
+
+ Map<Integer, StateMetaInfoSnapshot> createOrCheckStateForMetaInfo(
+ List<StateMetaInfoSnapshot> restoredMetaInfo,
+ Map<String, StateTable<K, ?, ?>> registeredKVStates,
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
+
+ final Map<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<>();
+ for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
+ final StateSnapshotRestore registeredState;
+
+ switch (metaInfoSnapshot.getBackendStateType()) {
+ case KEY_VALUE:
+ registeredState = registeredKVStates.get(metaInfoSnapshot.getName());
+ if (registeredState == null) {
+ RegisteredKeyValueStateBackendMetaInfo<?, ?>
+ registeredKeyedBackendStateMetaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
+ metaInfoSnapshot);
+ registeredKVStates.put(
+ metaInfoSnapshot.getName(),
+ stateTableFactory.newStateTable(
+ keyContext,
+ registeredKeyedBackendStateMetaInfo,
+ keySerializerProvider.currentSchemaSerializer()));
+ }
+ break;
+ case PRIORITY_QUEUE:
+ registeredState = registeredPQStates.get(metaInfoSnapshot.getName());
+ if (registeredState == null) {
+ registeredPQStates.put(
+ metaInfoSnapshot.getName(),
+ createInternal(
+ new RegisteredPriorityQueueStateBackendMetaInfo<>(
+ metaInfoSnapshot)));
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unexpected state type: "
+ + metaInfoSnapshot.getBackendStateType()
+ + ".");
+ }
+
+ // always put metaInfo into kvStatesById, because kvStatesById is KeyGroupsStateHandle
+ // related
+ kvStatesById.put(kvStatesById.size(), metaInfoSnapshot);
+ }
+
+ return kvStatesById;
+ }
+
+ private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+ HeapPriorityQueueSnapshotRestoreWrapper<T> createInternal(
+ RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
+
+ final String stateName = metaInfo.getName();
+ final HeapPriorityQueueSet<T> priorityQueue =
+ priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer());
+
+ return new HeapPriorityQueueSnapshotRestoreWrapper<>(
+ priorityQueue,
+ metaInfo,
+ KeyExtractorFunction.forKeyedObjects(),
+ keyGroupRange,
+ numberOfKeyGroups);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index 86a7491..e78751b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -24,16 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.PriorityComparable;
-import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RestoreOperation;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateSerializerProvider;
@@ -49,13 +44,11 @@ import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -75,11 +68,8 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
private final CloseableRegistry cancelStreamRegistry;
- private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
@Nonnull private final KeyGroupRange keyGroupRange;
- @Nonnegative private final int numberOfKeyGroups;
- private final StateTableFactory<K> stateTableFactory;
- private final InternalKeyContext<K> keyContext;
+ private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation;
HeapRestoreOperation(
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@@ -91,7 +81,7 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
HeapPriorityQueueSetFactory priorityQueueSetFactory,
@Nonnull KeyGroupRange keyGroupRange,
int numberOfKeyGroups,
- StateTableFactory<K> snapshotStrategy,
+ StateTableFactory<K> stateTableFactory,
InternalKeyContext<K> keyContext) {
this.restoreStateHandles = restoreStateHandles;
this.keySerializerProvider = keySerializerProvider;
@@ -99,11 +89,15 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
this.registeredKVStates = registeredKVStates;
this.registeredPQStates = registeredPQStates;
this.cancelStreamRegistry = cancelStreamRegistry;
- this.priorityQueueSetFactory = priorityQueueSetFactory;
this.keyGroupRange = keyGroupRange;
- this.numberOfKeyGroups = numberOfKeyGroups;
- this.stateTableFactory = snapshotStrategy;
- this.keyContext = keyContext;
+ this.heapMetaInfoRestoreOperation =
+ new HeapMetaInfoRestoreOperation<>(
+ keySerializerProvider,
+ priorityQueueSetFactory,
+ keyGroupRange,
+ numberOfKeyGroups,
+ stateTableFactory,
+ keyContext);
}
@Override
@@ -165,9 +159,9 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
List<StateMetaInfoSnapshot> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
- final Map<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<>();
-
- createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById);
+ final Map<Integer, StateMetaInfoSnapshot> kvStatesById =
+ this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo(
+ restoredMetaInfos, registeredKVStates, registeredPQStates);
readStateHandleStateData(
fsDataInputStream,
@@ -187,68 +181,6 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
return null;
}
- private void createOrCheckStateForMetaInfo(
- List<StateMetaInfoSnapshot> restoredMetaInfo,
- Map<Integer, StateMetaInfoSnapshot> kvStatesById) {
-
- for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
- final StateSnapshotRestore registeredState;
-
- switch (metaInfoSnapshot.getBackendStateType()) {
- case KEY_VALUE:
- registeredState = registeredKVStates.get(metaInfoSnapshot.getName());
- if (registeredState == null) {
- RegisteredKeyValueStateBackendMetaInfo<?, ?>
- registeredKeyedBackendStateMetaInfo =
- new RegisteredKeyValueStateBackendMetaInfo<>(
- metaInfoSnapshot);
- registeredKVStates.put(
- metaInfoSnapshot.getName(),
- stateTableFactory.newStateTable(
- keyContext,
- registeredKeyedBackendStateMetaInfo,
- keySerializerProvider.currentSchemaSerializer()));
- }
- break;
- case PRIORITY_QUEUE:
- registeredState = registeredPQStates.get(metaInfoSnapshot.getName());
- if (registeredState == null) {
- createInternal(
- new RegisteredPriorityQueueStateBackendMetaInfo<>(
- metaInfoSnapshot));
- }
- break;
- default:
- throw new IllegalStateException(
- "Unexpected state type: "
- + metaInfoSnapshot.getBackendStateType()
- + ".");
- }
-
- // always put metaInfo into kvStatesById, because kvStatesById is KeyGroupsStateHandle
- // related
- kvStatesById.put(kvStatesById.size(), metaInfoSnapshot);
- }
- }
-
- private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> void createInternal(
- RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
-
- final String stateName = metaInfo.getName();
- final HeapPriorityQueueSet<T> priorityQueue =
- priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer());
-
- HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper =
- new HeapPriorityQueueSnapshotRestoreWrapper<>(
- priorityQueue,
- metaInfo,
- KeyExtractorFunction.forKeyedObjects(),
- keyGroupRange,
- numberOfKeyGroups);
-
- registeredPQStates.put(stateName, wrapper);
- }
-
private void readStateHandleStateData(
FSDataInputStream fsDataInputStream,
DataInputViewStreamWrapper inView,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
new file mode 100644
index 0000000..13fdf8f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RestoreOperation;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation;
+import org.apache.flink.runtime.state.restore.KeyGroup;
+import org.apache.flink.runtime.state.restore.KeyGroupEntry;
+import org.apache.flink.runtime.state.restore.SavepointRestoreResult;
+import org.apache.flink.runtime.state.restore.ThrowingIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix;
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readKey;
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readKeyGroup;
+import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readNamespace;
+
+/**
+ * Implementation of heap savepoint restore operation. Savepoint shares a common unified binary
+ * format across all state backends.
+ *
+ * @param <K> The data type that the serializer serializes.
+ */
+public class HeapSavepointRestoreOperation<K> implements RestoreOperation<Void> {
+ private final int keyGroupPrefixBytes;
+ private final StateSerializerProvider<K> keySerializerProvider;
+ private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
+ private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
+ private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
+ private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation;
+ /*
+ Shared wrappers for deserializing an entry in the state handle. An optimization
+ to reduce the number of objects created.
+ */
+ private final DataInputDeserializer entryKeyDeserializer;
+ private final DataInputDeserializer entryValueDeserializer;
+ private final ListDelimitedSerializer listDelimitedSerializer;
+
+ HeapSavepointRestoreOperation(
+ @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
+ StateSerializerProvider<K> keySerializerProvider,
+ ClassLoader userCodeClassLoader,
+ Map<String, StateTable<K, ?, ?>> registeredKVStates,
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+ HeapPriorityQueueSetFactory priorityQueueSetFactory,
+ @Nonnull KeyGroupRange keyGroupRange,
+ int numberOfKeyGroups,
+ StateTableFactory<K> stateTableFactory,
+ InternalKeyContext<K> keyContext) {
+ this.keySerializerProvider = keySerializerProvider;
+ this.registeredKVStates = registeredKVStates;
+ this.registeredPQStates = registeredPQStates;
+ this.savepointRestoreOperation =
+ new FullSnapshotRestoreOperation<>(
+ keyGroupRange,
+ userCodeClassLoader,
+ restoreStateHandles,
+ keySerializerProvider);
+ this.keyGroupPrefixBytes = computeRequiredBytesInKeyGroupPrefix(numberOfKeyGroups);
+ this.heapMetaInfoRestoreOperation =
+ new HeapMetaInfoRestoreOperation<>(
+ keySerializerProvider,
+ priorityQueueSetFactory,
+ keyGroupRange,
+ numberOfKeyGroups,
+ stateTableFactory,
+ keyContext);
+ this.entryKeyDeserializer = new DataInputDeserializer();
+ this.entryValueDeserializer = new DataInputDeserializer();
+ this.listDelimitedSerializer = new ListDelimitedSerializer();
+ }
+
+ @Override
+ public Void restore() throws Exception {
+
+ registeredKVStates.clear();
+ registeredPQStates.clear();
+
+ try (ThrowingIterator<SavepointRestoreResult> restore =
+ this.savepointRestoreOperation.restore()) {
+ while (restore.hasNext()) {
+ SavepointRestoreResult restoreResult = restore.next();
+ List<StateMetaInfoSnapshot> restoredMetaInfos =
+ restoreResult.getStateMetaInfoSnapshots();
+
+ final Map<Integer, StateMetaInfoSnapshot> kvStatesById =
+ this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo(
+ restoredMetaInfos, registeredKVStates, registeredPQStates);
+
+ try (ThrowingIterator<KeyGroup> keyGroups = restoreResult.getRestoredKeyGroups()) {
+ while (keyGroups.hasNext()) {
+ readKeyGroupStateData(
+ keyGroups.next(),
+ keySerializerProvider.previousSchemaSerializer(),
+ kvStatesById);
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private void readKeyGroupStateData(
+ KeyGroup keyGroup,
+ TypeSerializer<K> keySerializer,
+ Map<Integer, StateMetaInfoSnapshot> kvStatesById)
+ throws Exception {
+
+ try (ThrowingIterator<KeyGroupEntry> entries = keyGroup.getKeyGroupEntries()) {
+ while (entries.hasNext()) {
+ KeyGroupEntry groupEntry = entries.next();
+ StateMetaInfoSnapshot infoSnapshot = kvStatesById.get(groupEntry.getKvStateId());
+ switch (infoSnapshot.getBackendStateType()) {
+ case KEY_VALUE:
+ readKVStateData(keySerializer, groupEntry, infoSnapshot);
+ break;
+ case PRIORITY_QUEUE:
+ readPriorityQueue(groupEntry, infoSnapshot);
+ break;
+ case OPERATOR:
+ case BROADCAST:
+ throw new IllegalStateException(
+ "Expected only keyed state. Received: "
+ + infoSnapshot.getBackendStateType());
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readPriorityQueue(KeyGroupEntry groupEntry, StateMetaInfoSnapshot infoSnapshot)
+ throws IOException {
+ DataInputDeserializer keyDeserializer = new DataInputDeserializer(groupEntry.getKey());
+ keyDeserializer.skipBytesToRead(keyGroupPrefixBytes);
+ HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>
+ priorityQueueSnapshotRestoreWrapper =
+ (HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>)
+ registeredPQStates.get(infoSnapshot.getName());
+ HeapPriorityQueueElement timer =
+ priorityQueueSnapshotRestoreWrapper
+ .getMetaInfo()
+ .getElementSerializer()
+ .deserialize(keyDeserializer);
+ HeapPriorityQueueSet<HeapPriorityQueueElement> priorityQueue =
+ priorityQueueSnapshotRestoreWrapper.getPriorityQueue();
+ priorityQueue.add(timer);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readKVStateData(
+ TypeSerializer<K> keySerializer,
+ KeyGroupEntry groupEntry,
+ StateMetaInfoSnapshot infoSnapshot)
+ throws IOException {
+ StateTable<K, Object, Object> stateTable =
+ (StateTable<K, Object, Object>) registeredKVStates.get(infoSnapshot.getName());
+ RegisteredKeyValueStateBackendMetaInfo<?, ?> metaInfo = stateTable.getMetaInfo();
+ TypeSerializer<?> namespaceSerializer = metaInfo.getPreviousNamespaceSerializer();
+ TypeSerializer<?> stateSerializer = metaInfo.getPreviousStateSerializer();
+ boolean isAmbigousKey =
+ keySerializer.getLength() < 0 && namespaceSerializer.getLength() < 0;
+ entryKeyDeserializer.setBuffer(groupEntry.getKey());
+ entryValueDeserializer.setBuffer(groupEntry.getValue());
+ int keyGroup = readKeyGroup(keyGroupPrefixBytes, entryKeyDeserializer);
+ K key = readKey(keySerializer, entryKeyDeserializer, isAmbigousKey);
+ Object namespace = readNamespace(namespaceSerializer, entryKeyDeserializer, isAmbigousKey);
+ switch (metaInfo.getStateType()) {
+ case LIST:
+ stateTable.put(
+ key,
+ keyGroup,
+ namespace,
+ listDelimitedSerializer.deserializeList(
+ groupEntry.getValue(),
+ ((ListSerializer<?>) stateSerializer).getElementSerializer()));
+ break;
+ case VALUE:
+ case REDUCING:
+ case FOLDING:
+ case AGGREGATING:
+ stateTable.put(
+ key,
+ keyGroup,
+ namespace,
+ stateSerializer.deserialize(entryValueDeserializer));
+ break;
+ case MAP:
+ deserializeMapStateEntry(
+ (StateTable<K, Object, Map<Object, Object>>)
+ (StateTable<K, ?, ?>) stateTable,
+ keyGroup,
+ key,
+ namespace,
+ (MapSerializer<Object, Object>) stateSerializer);
+ break;
+ default:
+ throw new IllegalStateException("Unknown state type: " + metaInfo.getStateType());
+ }
+ }
+
+ private void deserializeMapStateEntry(
+ StateTable<K, Object, Map<Object, Object>> stateTable,
+ int keyGroup,
+ K key,
+ Object namespace,
+ MapSerializer<Object, Object> stateSerializer)
+ throws IOException {
+ Object mapEntryKey = stateSerializer.getKeySerializer().deserialize(entryKeyDeserializer);
+ boolean isNull = entryValueDeserializer.readBoolean();
+ final Object mapEntryValue;
+ if (isNull) {
+ mapEntryValue = null;
+ } else {
+ mapEntryValue =
+ stateSerializer.getValueSerializer().deserialize(entryValueDeserializer);
+ }
+
+ Map<Object, Object> userMap = stateTable.get(key, namespace);
+ if (userMap == null) {
+ userMap = new HashMap<>();
+ stateTable.put(key, keyGroup, namespace, userMap);
+ }
+ userMap.put(mapEntryKey, mapEntryValue);
+ }
+}