You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:23 UTC

[flink] 07/09: [FLINK-20978] Implement SavepointKeyedStateHandle

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d1e386e808ddbd211bc2b0daaa441c82e2cd1ae
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 13 17:24:32 2021 +0100

    [FLINK-20978] Implement SavepointKeyedStateHandle
    
    Introduce a marker SavepointKeyedStateHandle interface for state handles that describe savepoints. Based on the interface we can later decide which strategy to use when restoring from the handle.
---
 .../metadata/MetadataV2V3SerializerBase.java       | 17 +++--
 .../state/CheckpointStreamWithResultProvider.java  | 22 +++++--
 .../runtime/state/FullSnapshotAsyncWriter.java     | 13 +++-
 .../state/KeyGroupsSavepointStateHandle.java       | 72 ++++++++++++++++++++++
 .../runtime/state/SavepointKeyedStateHandle.java   | 22 +++++++
 .../state/heap/HeapKeyedStateBackendBuilder.java   | 72 ++++++++++++++++------
 .../runtime/state/heap/HeapSnapshotStrategy.java   |  3 +-
 .../checkpoint/metadata/CheckpointTestUtils.java   | 30 +++++++--
 .../state/snapshot/RocksFullSnapshotStrategy.java  |  4 +-
 9 files changed, 219 insertions(+), 36 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 010003d..5bb8820 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -91,6 +92,7 @@ public abstract class MetadataV2V3SerializerBase {
     private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
     private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
     private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
+    private static final byte SAVEPOINT_KEY_GROUPS_HANDLE = 7;
 
     // ------------------------------------------------------------------------
     //  (De)serialization entry points
@@ -280,7 +282,11 @@ public abstract class MetadataV2V3SerializerBase {
         } else if (stateHandle instanceof KeyGroupsStateHandle) {
             KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
 
-            dos.writeByte(KEY_GROUPS_HANDLE);
+            if (stateHandle instanceof KeyGroupsSavepointStateHandle) {
+                dos.writeByte(SAVEPOINT_KEY_GROUPS_HANDLE);
+            } else {
+                dos.writeByte(KEY_GROUPS_HANDLE);
+            }
             dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
             dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
             for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
@@ -316,8 +322,7 @@ public abstract class MetadataV2V3SerializerBase {
         if (NULL_HANDLE == type) {
 
             return null;
-        } else if (KEY_GROUPS_HANDLE == type) {
-
+        } else if (KEY_GROUPS_HANDLE == type || SAVEPOINT_KEY_GROUPS_HANDLE == type) {
             int startKeyGroup = dis.readInt();
             int numKeyGroups = dis.readInt();
             KeyGroupRange keyGroupRange =
@@ -329,7 +334,11 @@ public abstract class MetadataV2V3SerializerBase {
             KeyGroupRangeOffsets keyGroupRangeOffsets =
                     new KeyGroupRangeOffsets(keyGroupRange, offsets);
             StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
-            return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+            if (SAVEPOINT_KEY_GROUPS_HANDLE == type) {
+                return new KeyGroupsSavepointStateHandle(keyGroupRangeOffsets, stateHandle);
+            } else {
+                return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+            }
         } else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
 
             long checkpointId = dis.readLong();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
index 4a8c89b..cd7b181 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
@@ -195,27 +195,39 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
     }
 
     /**
+     * Factory method for a {@link KeyedStateHandle} to be used in {@link
+     * #toKeyedStateHandleSnapshotResult(SnapshotResult, KeyGroupRangeOffsets,
+     * KeyedStateHandleFactory)}.
+     */
+    @FunctionalInterface
+    interface KeyedStateHandleFactory {
+        KeyedStateHandle create(
+                KeyGroupRangeOffsets keyGroupRangeOffsets, StreamStateHandle streamStateHandle);
+    }
+
+    /**
      * Helper method that takes a {@link SnapshotResult<StreamStateHandle>} and a {@link
-     * KeyGroupRangeOffsets} and creates a {@link SnapshotResult<KeyGroupsStateHandle>} by combining
-     * the key groups offsets with all the present stream state handles.
+     * KeyGroupRangeOffsets} and creates a {@link SnapshotResult<KeyedStateHandle>} by combining the
+     * key groups offsets with all the present stream state handles.
      */
     @Nonnull
     static SnapshotResult<KeyedStateHandle> toKeyedStateHandleSnapshotResult(
             @Nonnull SnapshotResult<StreamStateHandle> snapshotResult,
-            @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) {
+            @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets,
+            @Nonnull KeyedStateHandleFactory stateHandleFactory) {
 
         StreamStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
 
         if (jobManagerOwnedSnapshot != null) {
 
             KeyedStateHandle jmKeyedState =
-                    new KeyGroupsStateHandle(keyGroupRangeOffsets, jobManagerOwnedSnapshot);
+                    stateHandleFactory.create(keyGroupRangeOffsets, jobManagerOwnedSnapshot);
             StreamStateHandle taskLocalSnapshot = snapshotResult.getTaskLocalSnapshot();
 
             if (taskLocalSnapshot != null) {
 
                 KeyedStateHandle localKeyedState =
-                        new KeyGroupsStateHandle(keyGroupRangeOffsets, taskLocalSnapshot);
+                        stateHandleFactory.create(keyGroupRangeOffsets, taskLocalSnapshot);
                 return SnapshotResult.withLocalState(jmKeyedState, localKeyedState);
             } else {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
index ffc7684..c5a7277 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -50,8 +51,10 @@ public class FullSnapshotAsyncWriter<K>
             checkpointStreamSupplier;
 
     @Nonnull private final FullSnapshotResources<K> snapshotResources;
+    @Nonnull private final CheckpointType checkpointType;
 
     public FullSnapshotAsyncWriter(
+            @Nonnull CheckpointType checkpointType,
             @Nonnull
                     SupplierWithException<CheckpointStreamWithResultProvider, Exception>
                             checkpointStreamSupplier,
@@ -59,6 +62,7 @@ public class FullSnapshotAsyncWriter<K>
 
         this.checkpointStreamSupplier = checkpointStreamSupplier;
         this.snapshotResources = snapshotResources;
+        this.checkpointType = checkpointType;
     }
 
     @Override
@@ -73,9 +77,16 @@ public class FullSnapshotAsyncWriter<K>
         writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
 
         if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
+            final CheckpointStreamWithResultProvider.KeyedStateHandleFactory stateHandleFactory;
+            if (checkpointType.isSavepoint()) {
+                stateHandleFactory = KeyGroupsSavepointStateHandle::new;
+            } else {
+                stateHandleFactory = KeyGroupsStateHandle::new;
+            }
             return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(
                     checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(),
-                    keyGroupRangeOffsets);
+                    keyGroupRangeOffsets,
+                    stateHandleFactory);
         } else {
             throw new IOException("Stream is already unregistered/closed.");
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsSavepointStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsSavepointStateHandle.java
new file mode 100644
index 0000000..aa8bb41
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsSavepointStateHandle.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/** A {@link KeyGroupsStateHandle} that describes a savepoint in the unified format. */
+public class KeyGroupsSavepointStateHandle extends KeyGroupsStateHandle
+        implements SavepointKeyedStateHandle {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * @param groupRangeOffsets range of key-group ids that in the state of this handle
+     * @param streamStateHandle handle to the actual state of the key-groups
+     */
+    public KeyGroupsSavepointStateHandle(
+            KeyGroupRangeOffsets groupRangeOffsets, StreamStateHandle streamStateHandle) {
+        super(groupRangeOffsets, streamStateHandle);
+    }
+
+    /**
+     * @param keyGroupRange a key group range to intersect.
+     * @return key-group state over a range that is the intersection between this handle's key-group
+     *     range and the provided key-group range.
+     */
+    @Override
+    public KeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        KeyGroupRangeOffsets offsets = getGroupRangeOffsets().getIntersection(keyGroupRange);
+        if (offsets.getKeyGroupRange().getNumberOfKeyGroups() <= 0) {
+            return null;
+        }
+        return new KeyGroupsSavepointStateHandle(offsets, getDelegateStateHandle());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (!(o instanceof KeyGroupsSavepointStateHandle)) {
+            return false;
+        }
+
+        return super.equals(o);
+    }
+
+    @Override
+    public String toString() {
+        return "KeyGroupsSavepointStateHandle{"
+                + "groupRangeOffsets="
+                + getGroupRangeOffsets()
+                + ", stateHandle="
+                + getDelegateStateHandle()
+                + '}';
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointKeyedStateHandle.java
new file mode 100644
index 0000000..ae0ebaf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointKeyedStateHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/** A {@link KeyedStateHandle} that points to a savepoint taken in the unified format. */
+public interface SavepointKeyedStateHandle extends KeyedStateHandle {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 12bbc05..6519461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.state.BackendBuildingException;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RestoreOperation;
+import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -104,25 +106,7 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
             stateTableFactory = NestedMapsStateTable::new;
         }
 
-        HeapRestoreOperation<K> restoreOperation =
-                new HeapRestoreOperation<>(
-                        restoreStateHandles,
-                        keySerializerProvider,
-                        userCodeClassLoader,
-                        registeredKVStates,
-                        registeredPQStates,
-                        cancelStreamRegistry,
-                        priorityQueueSetFactory,
-                        keyGroupRange,
-                        numberOfKeyGroups,
-                        stateTableFactory,
-                        keyContext);
-        try {
-            restoreOperation.restore();
-            logger.info("Finished to build heap keyed state-backend.");
-        } catch (Exception e) {
-            throw new BackendBuildingException("Failed when trying to restore heap backend", e);
-        }
+        restoreState(registeredKVStates, registeredPQStates, keyContext, stateTableFactory);
         return new HeapKeyedStateBackend<>(
                 kvStateRegistry,
                 keySerializerProvider.currentSchemaSerializer(),
@@ -144,6 +128,56 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
                 keyContext);
     }
 
+    private void restoreState(
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+            InternalKeyContext<K> keyContext,
+            StateTableFactory<K> stateTableFactory)
+            throws BackendBuildingException {
+        final RestoreOperation<Void> restoreOperation;
+
+        final KeyedStateHandle firstHandle;
+        if (restoreStateHandles.isEmpty()) {
+            firstHandle = null;
+        } else {
+            firstHandle = restoreStateHandles.iterator().next();
+        }
+        if (firstHandle instanceof SavepointKeyedStateHandle) {
+            restoreOperation =
+                    new HeapSavepointRestoreOperation<>(
+                            restoreStateHandles,
+                            keySerializerProvider,
+                            userCodeClassLoader,
+                            registeredKVStates,
+                            registeredPQStates,
+                            priorityQueueSetFactory,
+                            keyGroupRange,
+                            numberOfKeyGroups,
+                            stateTableFactory,
+                            keyContext);
+        } else {
+            restoreOperation =
+                    new HeapRestoreOperation<>(
+                            restoreStateHandles,
+                            keySerializerProvider,
+                            userCodeClassLoader,
+                            registeredKVStates,
+                            registeredPQStates,
+                            cancelStreamRegistry,
+                            priorityQueueSetFactory,
+                            keyGroupRange,
+                            numberOfKeyGroups,
+                            stateTableFactory,
+                            keyContext);
+        }
+        try {
+            restoreOperation.restore();
+            logger.info("Finished to build heap keyed state-backend.");
+        } catch (Exception e) {
+            throw new BackendBuildingException("Failed when trying to restore heap backend", e);
+        }
+    }
+
     private HeapSnapshotStrategy<K> initSnapshotStrategy(
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
             Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 4010422..fe48c90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -206,7 +207,7 @@ class HeapSnapshotStrategy<K>
                         new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
                 SnapshotResult<StreamStateHandle> result =
                         streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
-                return toKeyedStateHandleSnapshotResult(result, kgOffs);
+                return toKeyedStateHandleSnapshotResult(result, kgOffs, KeyGroupsStateHandle::new);
             } else {
                 throw new IOException("Stream already unregistered.");
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index 766298c..d8c83e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -27,7 +27,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -126,14 +128,25 @@ public class CheckpointTestUtils {
                 }
 
                 if (hasKeyedBackend) {
-                    state.setRawKeyedState(
-                            isIncremental && !isSavepoint(basePath)
-                                    ? createDummyIncrementalKeyedStateHandle(random)
-                                    : createDummyKeyGroupStateHandle(random, basePath));
+                    final KeyedStateHandle stateHandle;
+                    if (isSavepoint(basePath)) {
+                        stateHandle = createDummyKeyGroupSavepointStateHandle(random, basePath);
+                    } else if (isIncremental) {
+                        stateHandle = createDummyIncrementalKeyedStateHandle(random);
+                    } else {
+                        stateHandle = createDummyKeyGroupStateHandle(random, null);
+                    }
+                    state.setRawKeyedState(stateHandle);
                 }
 
                 if (hasKeyedStream) {
-                    state.setManagedKeyedState(createDummyKeyGroupStateHandle(random, basePath));
+                    final KeyedStateHandle stateHandle;
+                    if (isSavepoint(basePath)) {
+                        stateHandle = createDummyKeyGroupSavepointStateHandle(random, basePath);
+                    } else {
+                        stateHandle = createDummyKeyGroupStateHandle(random, null);
+                    }
+                    state.setManagedKeyedState(stateHandle);
                 }
 
                 state.setInputChannelState(
@@ -217,6 +230,13 @@ public class CheckpointTestUtils {
         return result;
     }
 
+    public static KeyGroupsStateHandle createDummyKeyGroupSavepointStateHandle(
+            Random rnd, String basePath) {
+        return new KeyGroupsSavepointStateHandle(
+                new KeyGroupRangeOffsets(1, 1, new long[] {rnd.nextInt(1024)}),
+                createDummyStreamStateHandle(rnd, basePath));
+    }
+
     public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd, String basePath) {
         return new KeyGroupsStateHandle(
                 new KeyGroupRangeOffsets(1, 1, new long[] {rnd.nextInt(1024)}),
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
index 63bf351..0b83bd8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -137,7 +137,9 @@ public class RocksFullSnapshotStrategy<K>
                                 checkpointId, checkpointStreamFactory, checkpointOptions);
 
         return new FullSnapshotAsyncWriter<>(
-                checkpointStreamSupplier, fullRocksDBSnapshotResources);
+                checkpointOptions.getCheckpointType(),
+                checkpointStreamSupplier,
+                fullRocksDBSnapshotResources);
     }
 
     @Override