You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/10 15:46:08 UTC

[flink] 02/02: Revert "[FLINK-21206] Write savepoints in unified format from HeapStateBackend"

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3187c3431039c261b81fd641a157c6fb1e37a4bb
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Feb 10 16:31:06 2021 +0100

    Revert "[FLINK-21206] Write savepoints in unified format from HeapStateBackend"
    
    This reverts commit fc995d3af957941e0e16e56e6830ef97acbadfd1.
---
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  19 +--
 .../state/heap/HeapKeyedStateBackendBuilder.java   |  16 +-
 .../runtime/state/heap/HeapSavepointStrategy.java  |  95 -----------
 .../runtime/state/heap/HeapSnapshotResources.java  | 190 ---------------------
 .../runtime/state/heap/HeapSnapshotStrategy.java   | 116 +++++++++++--
 flink-tests/pom.xml                                |   9 -
 .../state/HeapSavepointStateBackendSwitchTest.java |   4 +-
 7 files changed, 111 insertions(+), 338 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0b42a32..c81bb68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -106,9 +106,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
      * The snapshot strategy for this backend. This determines, e.g., if snapshots are synchronous
      * or asynchronous.
      */
-    private final SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner;
-
-    private final SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner;
+    private final SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner;
 
     private final StateTableFactory<K> stateTableFactory;
 
@@ -127,8 +125,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             LocalRecoveryConfig localRecoveryConfig,
             HeapPriorityQueueSetFactory priorityQueueSetFactory,
-            SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner,
-            SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner,
+            SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner,
             StateTableFactory<K> stateTableFactory,
             InternalKeyContext<K> keyContext) {
         super(
@@ -144,8 +141,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
         this.registeredPQStates = registeredPQStates;
         this.localRecoveryConfig = localRecoveryConfig;
         this.priorityQueueSetFactory = priorityQueueSetFactory;
-        this.checkpointStrategyRunner = checkpointStrategyRunner;
-        this.savepointStrategyRunner = savepointStrategyRunner;
+        this.snapshotStrategyRunner = snapshotStrategyRunner;
         this.stateTableFactory = stateTableFactory;
         LOG.info("Initializing heap keyed state backend with stream factory.");
     }
@@ -360,13 +356,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             @Nonnull CheckpointOptions checkpointOptions)
             throws Exception {
 
-        if (checkpointOptions.getCheckpointType().isSavepoint()) {
-            return savepointStrategyRunner.snapshot(
-                    checkpointId, timestamp, streamFactory, checkpointOptions);
-        } else {
-            return checkpointStrategyRunner.snapshot(
-                    checkpointId, timestamp, streamFactory, checkpointOptions);
-        }
+        return snapshotStrategyRunner.snapshot(
+                checkpointId, timestamp, streamFactory, checkpointOptions);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 3991ba0..6519461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -96,14 +96,6 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
         CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
         HeapSnapshotStrategy<K> snapshotStrategy =
                 initSnapshotStrategy(registeredKVStates, registeredPQStates);
-        HeapSavepointStrategy<K> savepointStrategy =
-                new HeapSavepointStrategy<>(
-                        registeredKVStates,
-                        registeredPQStates,
-                        keyGroupCompressionDecorator,
-                        keyGroupRange,
-                        keySerializerProvider,
-                        numberOfKeyGroups);
         InternalKeyContext<K> keyContext =
                 new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
 
@@ -132,11 +124,6 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
                         snapshotStrategy,
                         cancelStreamRegistryForBackend,
                         asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
-                new SnapshotStrategyRunner<>(
-                        "Heap backend savepoint",
-                        savepointStrategy,
-                        cancelStreamRegistryForBackend,
-                        asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
                 stateTableFactory,
                 keyContext);
     }
@@ -200,7 +187,6 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
                 keyGroupCompressionDecorator,
                 localRecoveryConfig,
                 keyGroupRange,
-                keySerializerProvider,
-                numberOfKeyGroups);
+                keySerializerProvider);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
deleted file mode 100644
index ed70ed7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
-import org.apache.flink.runtime.state.FullSnapshotResources;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SnapshotStrategy;
-import org.apache.flink.runtime.state.StateSerializerProvider;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-
-/** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
-class HeapSavepointStrategy<K>
-        implements SnapshotStrategy<KeyedStateHandle, FullSnapshotResources<K>> {
-
-    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
-    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
-    private final StreamCompressionDecorator keyGroupCompressionDecorator;
-    private final KeyGroupRange keyGroupRange;
-    private final StateSerializerProvider<K> keySerializerProvider;
-    private final int totalKeyGroups;
-
-    HeapSavepointStrategy(
-            Map<String, StateTable<K, ?, ?>> registeredKVStates,
-            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
-            StreamCompressionDecorator keyGroupCompressionDecorator,
-            KeyGroupRange keyGroupRange,
-            StateSerializerProvider<K> keySerializerProvider,
-            int totalKeyGroups) {
-        this.registeredKVStates = registeredKVStates;
-        this.registeredPQStates = registeredPQStates;
-        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
-        this.keyGroupRange = keyGroupRange;
-        this.keySerializerProvider = keySerializerProvider;
-        this.totalKeyGroups = totalKeyGroups;
-    }
-
-    @Override
-    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) {
-        return HeapSnapshotResources.create(
-                registeredKVStates,
-                registeredPQStates,
-                keyGroupCompressionDecorator,
-                keyGroupRange,
-                getKeySerializer(),
-                totalKeyGroups);
-    }
-
-    @Override
-    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
-            FullSnapshotResources<K> syncPartResource,
-            long checkpointId,
-            long timestamp,
-            @Nonnull CheckpointStreamFactory streamFactory,
-            @Nonnull CheckpointOptions checkpointOptions) {
-
-        assert checkpointOptions.getCheckpointType().isSavepoint();
-        return new FullSnapshotAsyncWriter<>(
-                checkpointOptions.getCheckpointType(),
-                () ->
-                        CheckpointStreamWithResultProvider.createSimpleStream(
-                                CheckpointedStateScope.EXCLUSIVE, streamFactory),
-                syncPartResource);
-    }
-
-    public TypeSerializer<K> getKeySerializer() {
-        return keySerializerProvider.currentSchemaSerializer();
-    }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
deleted file mode 100644
index aa2c0af..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FullSnapshotResources;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyValueStateIterator;
-import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.StateSnapshotRestore;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A set of resources required to take a checkpoint or savepoint from a {@link
- * HeapKeyedStateBackend}.
- */
-@Internal
-final class HeapSnapshotResources<K> implements FullSnapshotResources<K> {
-    private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
-    private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
-    private final StreamCompressionDecorator streamCompressionDecorator;
-    private final Map<StateUID, Integer> stateNamesToId;
-    private final KeyGroupRange keyGroupRange;
-    private final TypeSerializer<K> keySerializer;
-    private final int totalKeyGroups;
-
-    private HeapSnapshotResources(
-            List<StateMetaInfoSnapshot> metaInfoSnapshots,
-            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
-            StreamCompressionDecorator streamCompressionDecorator,
-            Map<StateUID, Integer> stateNamesToId,
-            KeyGroupRange keyGroupRange,
-            TypeSerializer<K> keySerializer,
-            int totalKeyGroups) {
-        this.metaInfoSnapshots = metaInfoSnapshots;
-        this.cowStateStableSnapshots = cowStateStableSnapshots;
-        this.streamCompressionDecorator = streamCompressionDecorator;
-        this.stateNamesToId = stateNamesToId;
-        this.keyGroupRange = keyGroupRange;
-        this.keySerializer = keySerializer;
-        this.totalKeyGroups = totalKeyGroups;
-    }
-
-    public static <K> HeapSnapshotResources<K> create(
-            Map<String, StateTable<K, ?, ?>> registeredKVStates,
-            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
-            StreamCompressionDecorator streamCompressionDecorator,
-            KeyGroupRange keyGroupRange,
-            TypeSerializer<K> keySerializer,
-            int totalKeyGroups) {
-
-        if (registeredKVStates.isEmpty() && registeredPQStates.isEmpty()) {
-            return new HeapSnapshotResources<>(
-                    Collections.emptyList(),
-                    Collections.emptyMap(),
-                    streamCompressionDecorator,
-                    Collections.emptyMap(),
-                    keyGroupRange,
-                    keySerializer,
-                    totalKeyGroups);
-        }
-
-        int numStates = registeredKVStates.size() + registeredPQStates.size();
-
-        Preconditions.checkState(
-                numStates <= Short.MAX_VALUE,
-                "Too many states: "
-                        + numStates
-                        + ". Currently at most "
-                        + Short.MAX_VALUE
-                        + " states are supported");
-
-        final List<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<>(numStates);
-        final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
-        final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<>(numStates);
-
-        processSnapshotMetaInfoForAllStates(
-                metaInfoSnapshots,
-                cowStateStableSnapshots,
-                stateNamesToId,
-                registeredKVStates,
-                StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
-
-        processSnapshotMetaInfoForAllStates(
-                metaInfoSnapshots,
-                cowStateStableSnapshots,
-                stateNamesToId,
-                registeredPQStates,
-                StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
-
-        return new HeapSnapshotResources<>(
-                metaInfoSnapshots,
-                cowStateStableSnapshots,
-                streamCompressionDecorator,
-                stateNamesToId,
-                keyGroupRange,
-                keySerializer,
-                totalKeyGroups);
-    }
-
-    private static void processSnapshotMetaInfoForAllStates(
-            List<StateMetaInfoSnapshot> metaInfoSnapshots,
-            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
-            Map<StateUID, Integer> stateNamesToId,
-            Map<String, ? extends StateSnapshotRestore> registeredStates,
-            StateMetaInfoSnapshot.BackendStateType stateType) {
-
-        for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
-                registeredStates.entrySet()) {
-            final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
-            stateNamesToId.put(stateUid, stateNamesToId.size());
-            StateSnapshotRestore state = kvState.getValue();
-            if (null != state) {
-                final StateSnapshot stateSnapshot = state.stateSnapshot();
-                metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
-                cowStateStableSnapshots.put(stateUid, stateSnapshot);
-            }
-        }
-    }
-
-    @Override
-    public void release() {
-        for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
-            stateSnapshot.release();
-        }
-    }
-
-    public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
-        return metaInfoSnapshots;
-    }
-
-    @Override
-    public KeyValueStateIterator createKVStateIterator() throws IOException {
-        return new HeapKeyValueStateIterator(
-                keyGroupRange,
-                keySerializer,
-                totalKeyGroups,
-                stateNamesToId,
-                cowStateStableSnapshots);
-    }
-
-    @Override
-    public KeyGroupRange getKeyGroupRange() {
-        return keyGroupRange;
-    }
-
-    @Override
-    public TypeSerializer<K> getKeySerializer() {
-        return keySerializer;
-    }
-
-    @Override
-    public StreamCompressionDecorator getStreamCompressionDecorator() {
-        return streamCompressionDecorator;
-    }
-
-    public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
-        return cowStateStableSnapshots;
-    }
-
-    public Map<StateUID, Integer> getStateNamesToId() {
-        return stateNamesToId;
-    }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 78b96e5..fe48c90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -30,20 +30,26 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -54,7 +60,7 @@ import static org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.
 
 /** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
 class HeapSnapshotStrategy<K>
-        implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
+        implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotStrategy.HeapSnapshotResources> {
 
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
     private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
@@ -62,7 +68,6 @@ class HeapSnapshotStrategy<K>
     private final LocalRecoveryConfig localRecoveryConfig;
     private final KeyGroupRange keyGroupRange;
     private final StateSerializerProvider<K> keySerializerProvider;
-    private final int totalKeyGroups;
 
     HeapSnapshotStrategy(
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
@@ -70,31 +75,58 @@ class HeapSnapshotStrategy<K>
             StreamCompressionDecorator keyGroupCompressionDecorator,
             LocalRecoveryConfig localRecoveryConfig,
             KeyGroupRange keyGroupRange,
-            StateSerializerProvider<K> keySerializerProvider,
-            int totalKeyGroups) {
+            StateSerializerProvider<K> keySerializerProvider) {
         this.registeredKVStates = registeredKVStates;
         this.registeredPQStates = registeredPQStates;
         this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
         this.localRecoveryConfig = localRecoveryConfig;
         this.keyGroupRange = keyGroupRange;
         this.keySerializerProvider = keySerializerProvider;
-        this.totalKeyGroups = totalKeyGroups;
     }
 
     @Override
-    public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
-        return HeapSnapshotResources.create(
+    public HeapSnapshotResources syncPrepareResources(long checkpointId) {
+
+        if (!hasRegisteredState()) {
+            return new HeapSnapshotResources(
+                    Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap());
+        }
+
+        int numStates = registeredKVStates.size() + registeredPQStates.size();
+
+        Preconditions.checkState(
+                numStates <= Short.MAX_VALUE,
+                "Too many states: "
+                        + numStates
+                        + ". Currently at most "
+                        + Short.MAX_VALUE
+                        + " states are supported");
+
+        final List<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<>(numStates);
+        final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
+        final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<>(numStates);
+
+        processSnapshotMetaInfoForAllStates(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                stateNamesToId,
                 registeredKVStates,
+                StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
+
+        processSnapshotMetaInfoForAllStates(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                stateNamesToId,
                 registeredPQStates,
-                keyGroupCompressionDecorator,
-                keyGroupRange,
-                getKeySerializer(),
-                totalKeyGroups);
+                StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+
+        return new HeapSnapshotResources(
+                metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId);
     }
 
     @Override
     public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
-            HeapSnapshotResources<K> syncPartResource,
+            HeapSnapshotResources syncPartResource,
             long checkpointId,
             long timestamp,
             @Nonnull CheckpointStreamFactory streamFactory,
@@ -110,7 +142,7 @@ class HeapSnapshotStrategy<K>
                         // TODO: this code assumes that writing a serializer is threadsafe, we
                         // should support to
                         // get a serialized form already at state registration time in the future
-                        syncPartResource.getKeySerializer(),
+                        getKeySerializer(),
                         metaInfoSnapshots,
                         !Objects.equals(
                                 UncompressedStreamCompressionDecorator.INSTANCE,
@@ -182,7 +214,65 @@ class HeapSnapshotStrategy<K>
         };
     }
 
+    private void processSnapshotMetaInfoForAllStates(
+            List<StateMetaInfoSnapshot> metaInfoSnapshots,
+            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+            Map<StateUID, Integer> stateNamesToId,
+            Map<String, ? extends StateSnapshotRestore> registeredStates,
+            StateMetaInfoSnapshot.BackendStateType stateType) {
+
+        for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
+                registeredStates.entrySet()) {
+            final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
+            stateNamesToId.put(stateUid, stateNamesToId.size());
+            StateSnapshotRestore state = kvState.getValue();
+            if (null != state) {
+                final StateSnapshot stateSnapshot = state.stateSnapshot();
+                metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
+                cowStateStableSnapshots.put(stateUid, stateSnapshot);
+            }
+        }
+    }
+
+    private boolean hasRegisteredState() {
+        return !(registeredKVStates.isEmpty() && registeredPQStates.isEmpty());
+    }
+
     public TypeSerializer<K> getKeySerializer() {
         return keySerializerProvider.currentSchemaSerializer();
     }
+
+    static class HeapSnapshotResources implements SnapshotResources {
+        private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
+        private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
+        private final Map<StateUID, Integer> stateNamesToId;
+
+        HeapSnapshotResources(
+                @Nonnull List<StateMetaInfoSnapshot> metaInfoSnapshots,
+                @Nonnull Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+                @Nonnull Map<StateUID, Integer> stateNamesToId) {
+            this.metaInfoSnapshots = metaInfoSnapshots;
+            this.cowStateStableSnapshots = cowStateStableSnapshots;
+            this.stateNamesToId = stateNamesToId;
+        }
+
+        @Override
+        public void release() {
+            for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
+                stateSnapshot.release();
+            }
+        }
+
+        public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
+            return metaInfoSnapshots;
+        }
+
+        public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
+            return cowStateStableSnapshots;
+        }
+
+        public Map<StateUID, Integer> getStateNamesToId() {
+            return stateNamesToId;
+        }
+    }
 }
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index fb30a1d..7031344 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -231,15 +231,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>com.github.oshi</groupId>
 			<artifactId>oshi-core</artifactId>
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
index 5138e03..d5708a6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.test.state.BackendSwitchSpecs.BackendSwitchSpec;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 
 /** Tests for switching a HEAP state backend to a different one. */
 @RunWith(Parameterized.class)
@@ -35,6 +35,6 @@ public class HeapSavepointStateBackendSwitchTest extends SavepointStateBackendSw
 
     @Parameterized.Parameters
     public static Collection<BackendSwitchSpec> targetBackends() {
-        return Arrays.asList(BackendSwitchSpecs.HEAP, BackendSwitchSpecs.ROCKS);
+        return Collections.singletonList(BackendSwitchSpecs.HEAP);
     }
 }