You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/08/09 03:25:41 UTC

[flink] branch master updated: [FLINK-28699][state] Make non-incremental rocksdb checkpoint as native format

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a067e5e1c [FLINK-28699][state] Make non-incremental rocksdb checkpoint as native format
d9a067e5e1c is described below

commit d9a067e5e1c8672930b0ea7d76400a1d3020a1e2
Author: Lihe Ma <ma...@163.com>
AuthorDate: Thu Jun 30 10:42:28 2022 +0800

    [FLINK-28699][state] Make non-incremental rocksdb checkpoint as native format
    
    This closes #20399.
---
 .../flink/runtime/state/ttl/TtlStateTestBase.java  |   6 +-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  17 +-
 .../snapshot/RocksDBSnapshotStrategyBase.java      | 318 +++++++++++++++++++-
 .../state/snapshot/RocksFullSnapshotStrategy.java  | 169 -----------
 .../snapshot/RocksIncrementalSnapshotStrategy.java | 323 ++-------------------
 .../snapshot/RocksNativeFullSnapshotStrategy.java  | 255 ++++++++++++++++
 .../state/snapshot/RocksSnapshotUtil.java          |  13 +
 .../state/EmbeddedRocksDBStateBackendTest.java     |  10 -
 .../RocksIncrementalSnapshotStrategyTest.java      |   2 +-
 .../state/ttl/FullSnapshotRocksDbTtlStateTest.java |   5 +
 .../state/ttl/IncSnapshotRocksDbTtlStateTest.java  |   2 +-
 .../ResumeCheckpointManuallyITCase.java            |  91 +++++-
 .../test/checkpointing/SavepointFormatITCase.java  |   9 +-
 13 files changed, 728 insertions(+), 492 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index d6225ab0c4b..c1e291450f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -95,7 +95,7 @@ public abstract class TtlStateTestBase {
                 new TtlReducingStateTestContext());
     }
 
-    public boolean fullSnapshot() {
+    public boolean isSavepoint() {
         return true;
     }
 
@@ -354,7 +354,7 @@ public abstract class TtlStateTestBase {
 
     @Test
     public void testMultipleKeysWithSnapshotCleanup() throws Exception {
-        assumeTrue("full snapshot strategy", fullSnapshot());
+        assumeTrue("full snapshot strategy", isSavepoint());
         initTest(getConfBuilder(TTL).cleanupFullSnapshot().build());
         // set time back after restore to see entry unexpired if it was not cleaned up in snapshot
         // properly
@@ -369,7 +369,7 @@ public abstract class TtlStateTestBase {
 
     @Test
     public void testMultipleNamespacesWithSnapshotCleanup() throws Exception {
-        assumeTrue("full snapshot strategy", fullSnapshot());
+        assumeTrue("full snapshot strategy", isSavepoint());
         initTest(getConfBuilder(TTL).cleanupFullSnapshot().build());
         // set time back after restore to see entry unexpired if it was not cleaned up in snapshot
         // properly
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index ca1adc27303..7007f1b5463 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -28,8 +28,8 @@ import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperat
 import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
 import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
-import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
 import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
+import org.apache.flink.contrib.streaming.state.snapshot.RocksNativeFullSnapshotStrategy;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
@@ -524,11 +524,11 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
             SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles,
             long lastCompletedCheckpointId) {
         RocksDBSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy;
+        RocksDBStateUploader stateUploader =
+                injectRocksDBStateUploader == null
+                        ? new RocksDBStateUploader(numberOfTransferingThreads)
+                        : injectRocksDBStateUploader;
         if (enableIncrementalCheckpointing) {
-            RocksDBStateUploader stateUploader =
-                    injectRocksDBStateUploader == null
-                            ? new RocksDBStateUploader(numberOfTransferingThreads)
-                            : injectRocksDBStateUploader;
             checkpointSnapshotStrategy =
                     new RocksIncrementalSnapshotStrategy<>(
                             db,
@@ -546,16 +546,17 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                             lastCompletedCheckpointId);
         } else {
             checkpointSnapshotStrategy =
-                    new RocksFullSnapshotStrategy<>(
+                    new RocksNativeFullSnapshotStrategy<>(
                             db,
                             rocksDBResourceGuard,
                             keySerializerProvider.currentSchemaSerializer(),
                             kvStateInformation,
-                            registeredPQStates,
                             keyGroupRange,
                             keyGroupPrefixBytes,
                             localRecoveryConfig,
-                            keyGroupCompressionDecorator);
+                            instanceBasePath,
+                            backendUID,
+                            stateUploader);
         }
         return checkpointSnapshotStrategy;
     }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index b064fc54921..de2be83d8ef 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -21,21 +21,53 @@ package org.apache.flink.contrib.streaming.state.snapshot;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResources;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.ResourceGuard;
 
+import org.rocksdb.Checkpoint;
 import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
 
 /**
  * Abstract base class for {@link SnapshotStrategy} implementations for RocksDB state backend.
@@ -43,7 +75,11 @@ import java.util.LinkedHashMap;
  * @param <K> type of the backend keys.
  */
 public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources>
-        implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable {
+        implements CheckpointListener,
+                SnapshotStrategy<
+                        KeyedStateHandle,
+                        RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources>,
+                AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class);
 
@@ -69,6 +105,15 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
     /** The configuration for local recovery. */
     @Nonnull protected final LocalRecoveryConfig localRecoveryConfig;
 
+    /** Base path of the RocksDB instance. */
+    @Nonnull protected final File instanceBasePath;
+
+    /** The local directory name of the current snapshot strategy. */
+    protected final String localDirectoryName;
+
+    /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
+    @Nonnull protected final UUID backendUID;
+
     public RocksDBSnapshotStrategyBase(
             @Nonnull String description,
             @Nonnull RocksDB db,
@@ -77,7 +122,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
             @Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
             @Nonnull KeyGroupRange keyGroupRange,
             @Nonnegative int keyGroupPrefixBytes,
-            @Nonnull LocalRecoveryConfig localRecoveryConfig) {
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull File instanceBasePath,
+            @Nonnull UUID backendUID) {
         this.db = db;
         this.rocksDBResourceGuard = rocksDBResourceGuard;
         this.keySerializer = keySerializer;
@@ -86,6 +133,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.localRecoveryConfig = localRecoveryConfig;
         this.description = description;
+        this.instanceBasePath = instanceBasePath;
+        this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
+        this.backendUID = backendUID;
     }
 
     @Nonnull
@@ -93,6 +143,270 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
         return description;
     }
 
+    @Override
+    public NativeRocksDBSnapshotResources syncPrepareResources(long checkpointId) throws Exception {
+
+        final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
+        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
+
+        final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
+                new ArrayList<>(kvStateInformation.size());
+        final PreviousSnapshot previousSnapshot =
+                snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
+
+        takeDBNativeCheckpoint(snapshotDirectory);
+
+        return new NativeRocksDBSnapshotResources(
+                snapshotDirectory, previousSnapshot, stateMetaInfoSnapshots);
+    }
+
+    protected abstract PreviousSnapshot snapshotMetaData(
+            long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots);
+
+    private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
+            throws Exception {
+        // create hard links of living files in the output path
+        try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
+                Checkpoint checkpoint = Checkpoint.create(db)) {
+            checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
+        } catch (Exception ex) {
+            try {
+                outputDirectory.cleanup();
+            } catch (IOException cleanupEx) {
+                ex = ExceptionUtils.firstOrSuppressed(cleanupEx, ex);
+            }
+            throw ex;
+        }
+    }
+
+    @Nonnull
+    protected SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId)
+            throws IOException {
+
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            // create a "permanent" snapshot directory for local recovery.
+            LocalRecoveryDirectoryProvider directoryProvider =
+                    localRecoveryConfig
+                            .getLocalStateDirectoryProvider()
+                            .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
+            File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
+
+            if (!directory.exists() && !directory.mkdirs()) {
+                throw new IOException(
+                        "Local state base directory for checkpoint "
+                                + checkpointId
+                                + " does not exist and could not be created: "
+                                + directory);
+            }
+
+            // introduces an extra directory because RocksDB wants a non-existing directory for
+            // native checkpoints.
+            // append localDirectoryName here to solve directory collision problem when two stateful
+            // operators chained in one task.
+            File rdbSnapshotDir = new File(directory, localDirectoryName);
+            if (rdbSnapshotDir.exists()) {
+                FileUtils.deleteDirectory(rdbSnapshotDir);
+            }
+
+            Path path = rdbSnapshotDir.toPath();
+            // create a "permanent" snapshot directory because local recovery is active.
+            try {
+                return SnapshotDirectory.permanent(path);
+            } catch (IOException ex) {
+                try {
+                    FileUtils.deleteDirectory(directory);
+                } catch (IOException delEx) {
+                    ex = ExceptionUtils.firstOrSuppressed(delEx, ex);
+                }
+                throw ex;
+            }
+        } else {
+            // create a "temporary" snapshot directory because local recovery is inactive.
+            File snapshotDir = new File(instanceBasePath, "chk-" + checkpointId);
+            return SnapshotDirectory.temporary(snapshotDir);
+        }
+    }
+
+    protected void cleanupIncompleteSnapshot(
+            @Nonnull List<StateObject> statesToDiscard,
+            @Nonnull SnapshotDirectory localBackupDirectory) {
+        try {
+            StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
+        } catch (Exception e) {
+            LOG.warn("Could not properly discard states.", e);
+        }
+
+        if (localBackupDirectory.isSnapshotCompleted()) {
+            try {
+                DirectoryStateHandle directoryStateHandle =
+                        localBackupDirectory.completeSnapshotAndGetHandle();
+                if (directoryStateHandle != null) {
+                    directoryStateHandle.discardState();
+                }
+            } catch (Exception e) {
+                LOG.warn("Could not properly discard local state.", e);
+            }
+        }
+    }
+
+    @Nonnull
+    protected SnapshotResult<StreamStateHandle> materializeMetaData(
+            @Nonnull CloseableRegistry snapshotCloseableRegistry,
+            @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            long checkpointId,
+            @Nonnull CheckpointStreamFactory checkpointStreamFactory)
+            throws Exception {
+
+        CheckpointStreamWithResultProvider streamWithResultProvider =
+                localRecoveryConfig.isLocalRecoveryEnabled()
+                        ? CheckpointStreamWithResultProvider.createDuplicatingStream(
+                                checkpointId,
+                                CheckpointedStateScope.EXCLUSIVE,
+                                checkpointStreamFactory,
+                                localRecoveryConfig
+                                        .getLocalStateDirectoryProvider()
+                                        .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
+                        : CheckpointStreamWithResultProvider.createSimpleStream(
+                                CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
+
+        snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
+
+        try {
+            // no need for compression scheme support because sst-files are already compressed
+            KeyedBackendSerializationProxy<K> serializationProxy =
+                    new KeyedBackendSerializationProxy<>(
+                            keySerializer, stateMetaInfoSnapshots, false);
+
+            DataOutputView out =
+                    new DataOutputViewStreamWrapper(
+                            streamWithResultProvider.getCheckpointOutputStream());
+
+            serializationProxy.write(out);
+
+            if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
+                SnapshotResult<StreamStateHandle> result =
+                        streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
+                streamWithResultProvider = null;
+                return result;
+            } else {
+                throw new IOException("Stream already closed and cannot return a handle.");
+            }
+        } finally {
+            if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
+                IOUtils.closeQuietly(streamWithResultProvider);
+            }
+        }
+    }
+
     @Override
     public abstract void close();
+
+    /** Common operation in native rocksdb snapshot result supplier. */
+    protected abstract class RocksDBSnapshotOperation
+            implements SnapshotResultSupplier<KeyedStateHandle> {
+        /** Id for the current checkpoint. */
+        protected final long checkpointId;
+
+        /** Stream factory that creates the output streams to DFS. */
+        @Nonnull protected final CheckpointStreamFactory checkpointStreamFactory;
+
+        /** The state meta data. */
+        @Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+        /** Local directory for the RocksDB native backup. */
+        @Nonnull protected final SnapshotDirectory localBackupDirectory;
+
+        protected RocksDBSnapshotOperation(
+                long checkpointId,
+                @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+                @Nonnull SnapshotDirectory localBackupDirectory,
+                @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+            this.checkpointId = checkpointId;
+            this.checkpointStreamFactory = checkpointStreamFactory;
+            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+            this.localBackupDirectory = localBackupDirectory;
+        }
+
+        protected Optional<KeyedStateHandle> getLocalSnapshot(
+                @Nullable StreamStateHandle localStreamStateHandle,
+                Map<StateHandleID, StreamStateHandle> sharedStateHandleIDs)
+                throws IOException {
+            final DirectoryStateHandle directoryStateHandle =
+                    localBackupDirectory.completeSnapshotAndGetHandle();
+            if (directoryStateHandle != null && localStreamStateHandle != null) {
+                return Optional.of(
+                        new IncrementalLocalKeyedStateHandle(
+                                backendUID,
+                                checkpointId,
+                                directoryStateHandle,
+                                keyGroupRange,
+                                localStreamStateHandle,
+                                sharedStateHandleIDs));
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    /** A {@link SnapshotResources} for native rocksdb snapshot. */
+    protected static class NativeRocksDBSnapshotResources implements SnapshotResources {
+        @Nonnull protected final SnapshotDirectory snapshotDirectory;
+
+        @Nonnull protected final PreviousSnapshot previousSnapshot;
+
+        @Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+        public NativeRocksDBSnapshotResources(
+                SnapshotDirectory snapshotDirectory,
+                PreviousSnapshot previousSnapshot,
+                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+            this.snapshotDirectory = snapshotDirectory;
+            this.previousSnapshot = previousSnapshot;
+            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+        }
+
+        @Override
+        public void release() {
+            try {
+                if (snapshotDirectory.exists()) {
+                    LOG.trace(
+                            "Running cleanup for local RocksDB backup directory {}.",
+                            snapshotDirectory);
+                    boolean cleanupOk = snapshotDirectory.cleanup();
+
+                    if (!cleanupOk) {
+                        LOG.debug("Could not properly cleanup local RocksDB backup directory.");
+                    }
+                }
+            } catch (IOException e) {
+                LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
+            }
+        }
+    }
+
+    protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
+            new PreviousSnapshot(Collections.emptyMap());
+
+    /** Previous snapshot with uploaded sst files. */
+    protected static class PreviousSnapshot {
+
+        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
+
+        protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) {
+            this.confirmedSstFiles = confirmedSstFiles;
+        }
+
+        protected Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) {
+            if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) {
+                // we introduce a placeholder state handle, that is replaced with the
+                // original from the shared state registry (created from a previous checkpoint)
+                return Optional.of(
+                        new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
+            } else {
+                // Don't use any uploaded but not confirmed handles because they might be deleted
+                // (by TM) if the previous checkpoint failed. See FLINK-25395
+                return Optional.empty();
+            }
+        }
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
deleted file mode 100644
index c328902ad51..00000000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.snapshot;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
-import org.apache.flink.runtime.state.FullSnapshotResources;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
-import org.apache.flink.util.ResourceGuard;
-import org.apache.flink.util.function.SupplierWithException;
-
-import org.rocksdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-
-import java.util.LinkedHashMap;
-
-/**
- * Snapshot strategy to create full snapshots of {@link
- * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. Iterates and writes all
- * states from a RocksDB snapshot of the column families.
- *
- * @param <K> type of the backend keys.
- */
-public class RocksFullSnapshotStrategy<K>
-        extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RocksFullSnapshotStrategy.class);
-
-    private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";
-
-    /** This decorator is used to apply compression per key-group for the written snapshot data. */
-    @Nonnull private final StreamCompressionDecorator keyGroupCompressionDecorator;
-
-    private final LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>>
-            registeredPQStates;
-
-    public RocksFullSnapshotStrategy(
-            @Nonnull RocksDB db,
-            @Nonnull ResourceGuard rocksDBResourceGuard,
-            @Nonnull TypeSerializer<K> keySerializer,
-            @Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
-            @Nonnull
-                    LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>>
-                            registeredPQStates,
-            @Nonnull KeyGroupRange keyGroupRange,
-            @Nonnegative int keyGroupPrefixBytes,
-            @Nonnull LocalRecoveryConfig localRecoveryConfig,
-            @Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
-        super(
-                DESCRIPTION,
-                db,
-                rocksDBResourceGuard,
-                keySerializer,
-                kvStateInformation,
-                keyGroupRange,
-                keyGroupPrefixBytes,
-                localRecoveryConfig);
-
-        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
-        this.registeredPQStates = registeredPQStates;
-    }
-
-    @Override
-    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
-        return RocksDBFullSnapshotResources.create(
-                kvStateInformation,
-                registeredPQStates,
-                db,
-                rocksDBResourceGuard,
-                keyGroupRange,
-                keySerializer,
-                keyGroupPrefixBytes,
-                keyGroupCompressionDecorator);
-    }
-
-    @Override
-    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
-            FullSnapshotResources<K> fullRocksDBSnapshotResources,
-            long checkpointId,
-            long timestamp,
-            @Nonnull CheckpointStreamFactory checkpointStreamFactory,
-            @Nonnull CheckpointOptions checkpointOptions) {
-
-        if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
-                        timestamp);
-            }
-            return registry -> SnapshotResult.empty();
-        }
-
-        final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
-                checkpointStreamSupplier =
-                        createCheckpointStreamSupplier(
-                                checkpointId, checkpointStreamFactory, checkpointOptions);
-
-        return new FullSnapshotAsyncWriter<>(
-                checkpointOptions.getCheckpointType(),
-                checkpointStreamSupplier,
-                fullRocksDBSnapshotResources);
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {
-        // nothing to do.
-    }
-
-    @Override
-    public void notifyCheckpointAborted(long checkpointId) {
-        // nothing to do.
-    }
-
-    @Override
-    public void close() {
-        // nothing to do.
-    }
-
-    private SupplierWithException<CheckpointStreamWithResultProvider, Exception>
-            createCheckpointStreamSupplier(
-                    long checkpointId,
-                    CheckpointStreamFactory primaryStreamFactory,
-                    CheckpointOptions checkpointOptions) {
-
-        return localRecoveryConfig.isLocalRecoveryEnabled()
-                        && !checkpointOptions.getCheckpointType().isSavepoint()
-                ? () ->
-                        CheckpointStreamWithResultProvider.createDuplicatingStream(
-                                checkpointId,
-                                CheckpointedStateScope.EXCLUSIVE,
-                                primaryStreamFactory,
-                                localRecoveryConfig
-                                        .getLocalStateDirectoryProvider()
-                                        .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
-                : () ->
-                        CheckpointStreamWithResultProvider.createSimpleStream(
-                                CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory);
-    }
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 8532e567549..bd464b05677 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -22,52 +22,35 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SnapshotDirectory;
-import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ResourceGuard;
 
-import org.rocksdb.Checkpoint;
 import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -79,6 +62,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize;
 
 /**
  * Snapshot strategy for {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}
@@ -88,19 +72,13 @@ import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUti
  */
 public class RocksIncrementalSnapshotStrategy<K>
         extends RocksDBSnapshotStrategyBase<
-                K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> {
+                K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
 
     private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot";
 
-    /** Base path of the RocksDB instance. */
-    @Nonnull private final File instanceBasePath;
-
-    /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
-    @Nonnull private final UUID backendUID;
-
     /**
      * Stores the {@link StateHandleID IDs} of uploaded SST files that build the incremental
      * history. Once the checkpoint is confirmed by JM, only the ID paired with {@link
@@ -114,9 +92,6 @@ public class RocksIncrementalSnapshotStrategy<K>
     /** The help class used to upload state files. */
     private final RocksDBStateUploader stateUploader;
 
-    /** The local directory name of the current snapshot strategy. */
-    private final String localDirectoryName;
-
     public RocksIncrementalSnapshotStrategy(
             @Nonnull RocksDB db,
             @Nonnull ResourceGuard rocksDBResourceGuard,
@@ -140,10 +115,9 @@ public class RocksIncrementalSnapshotStrategy<K>
                 kvStateInformation,
                 keyGroupRange,
                 keyGroupPrefixBytes,
-                localRecoveryConfig);
-
-        this.instanceBasePath = instanceBasePath;
-        this.backendUID = backendUID;
+                localRecoveryConfig,
+                instanceBasePath,
+                backendUID);
         this.uploadedStateIDs = new TreeMap<>();
         for (Map.Entry<Long, Map<StateHandleID, StreamStateHandle>> entry :
                 uploadedStateHandles.entrySet()) {
@@ -156,30 +130,11 @@ public class RocksIncrementalSnapshotStrategy<K>
         }
         this.stateUploader = rocksDBStateUploader;
         this.lastCompletedCheckpointId = lastCompletedCheckpointId;
-        this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
-    }
-
-    @Override
-    public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId)
-            throws Exception {
-
-        final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
-        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
-
-        final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
-                new ArrayList<>(kvStateInformation.size());
-        final PreviousSnapshot previousSnapshot =
-                snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
-
-        takeDBNativeCheckpoint(snapshotDirectory);
-
-        return new IncrementalRocksDBSnapshotResources(
-                snapshotDirectory, previousSnapshot, stateMetaInfoSnapshots);
     }
 
     @Override
     public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
-            IncrementalRocksDBSnapshotResources snapshotResources,
+            NativeRocksDBSnapshotResources snapshotResources,
             long checkpointId,
             long timestamp,
             @Nonnull CheckpointStreamFactory checkpointStreamFactory,
@@ -247,54 +202,8 @@ public class RocksIncrementalSnapshotStrategy<K>
         stateUploader.close();
     }
 
-    @Nonnull
-    private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException {
-
-        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
-            // create a "permanent" snapshot directory for local recovery.
-            LocalRecoveryDirectoryProvider directoryProvider =
-                    localRecoveryConfig
-                            .getLocalStateDirectoryProvider()
-                            .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
-            File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
-
-            if (!directory.exists() && !directory.mkdirs()) {
-                throw new IOException(
-                        "Local state base directory for checkpoint "
-                                + checkpointId
-                                + " does not exist and could not be created: "
-                                + directory);
-            }
-
-            // introduces an extra directory because RocksDB wants a non-existing directory for
-            // native checkpoints.
-            // append localDirectoryName here to solve directory collision problem when two stateful
-            // operators chained in one task.
-            File rdbSnapshotDir = new File(directory, localDirectoryName);
-            if (rdbSnapshotDir.exists()) {
-                FileUtils.deleteDirectory(rdbSnapshotDir);
-            }
-
-            Path path = rdbSnapshotDir.toPath();
-            // create a "permanent" snapshot directory because local recovery is active.
-            try {
-                return SnapshotDirectory.permanent(path);
-            } catch (IOException ex) {
-                try {
-                    FileUtils.deleteDirectory(directory);
-                } catch (IOException delEx) {
-                    ex = ExceptionUtils.firstOrSuppressed(delEx, ex);
-                }
-                throw ex;
-            }
-        } else {
-            // create a "temporary" snapshot directory because local recovery is inactive.
-            File snapshotDir = new File(instanceBasePath, "chk-" + checkpointId);
-            return SnapshotDirectory.temporary(snapshotDir);
-        }
-    }
-
-    private PreviousSnapshot snapshotMetaData(
+    @Override
+    protected PreviousSnapshot snapshotMetaData(
             long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
 
         final long lastCompletedCheckpoint;
@@ -325,39 +234,10 @@ public class RocksIncrementalSnapshotStrategy<K>
         return new PreviousSnapshot(confirmedSstFiles);
     }
 
-    private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
-            throws Exception {
-        // create hard links of living files in the output path
-        try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
-                Checkpoint checkpoint = Checkpoint.create(db)) {
-            checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
-        } catch (Exception ex) {
-            try {
-                outputDirectory.cleanup();
-            } catch (IOException cleanupEx) {
-                ex = ExceptionUtils.firstOrSuppressed(cleanupEx, ex);
-            }
-            throw ex;
-        }
-    }
-
     /**
      * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend.
      */
-    private final class RocksDBIncrementalSnapshotOperation
-            implements SnapshotResultSupplier<KeyedStateHandle> {
-
-        /** Id for the current checkpoint. */
-        private final long checkpointId;
-
-        /** Stream factory that creates the output streams to DFS. */
-        @Nonnull private final CheckpointStreamFactory checkpointStreamFactory;
-
-        /** The state meta data. */
-        @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-
-        /** Local directory for the RocksDB native backup. */
-        @Nonnull private final SnapshotDirectory localBackupDirectory;
+    private final class RocksDBIncrementalSnapshotOperation extends RocksDBSnapshotOperation {
 
         /** All sst files that were part of the last previously completed checkpoint. */
         @Nonnull private final PreviousSnapshot previousSnapshot;
@@ -372,11 +252,12 @@ public class RocksIncrementalSnapshotStrategy<K>
                 @Nonnull SnapshotType.SharingFilesStrategy sharingFilesStrategy,
                 @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
 
-            this.checkpointStreamFactory = checkpointStreamFactory;
+            super(
+                    checkpointId,
+                    checkpointStreamFactory,
+                    localBackupDirectory,
+                    stateMetaInfoSnapshots);
             this.previousSnapshot = previousSnapshot;
-            this.checkpointId = checkpointId;
-            this.localBackupDirectory = localBackupDirectory;
-            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.sharingFilesStrategy = sharingFilesStrategy;
         }
 
@@ -395,7 +276,12 @@ public class RocksIncrementalSnapshotStrategy<K>
 
             try {
 
-                metaStateHandle = materializeMetaData(snapshotCloseableRegistry);
+                metaStateHandle =
+                        materializeMetaData(
+                                snapshotCloseableRegistry,
+                                stateMetaInfoSnapshots,
+                                checkpointId,
+                                checkpointStreamFactory);
 
                 // Sanity checks - they should never fail
                 Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
@@ -418,27 +304,16 @@ public class RocksIncrementalSnapshotStrategy<K>
                                 metaStateHandle.getJobManagerOwnedSnapshot(),
                                 checkpointedSize);
 
-                final DirectoryStateHandle directoryStateHandle =
-                        localBackupDirectory.completeSnapshotAndGetHandle();
-                final SnapshotResult<KeyedStateHandle> snapshotResult;
-                if (directoryStateHandle != null
-                        && metaStateHandle.getTaskLocalSnapshot() != null) {
-
-                    IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
-                            new IncrementalLocalKeyedStateHandle(
-                                    backendUID,
-                                    checkpointId,
-                                    directoryStateHandle,
-                                    keyGroupRange,
-                                    metaStateHandle.getTaskLocalSnapshot(),
-                                    sstFiles);
-
-                    snapshotResult =
-                            SnapshotResult.withLocalState(
-                                    jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
-                } else {
-                    snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
-                }
+                Optional<KeyedStateHandle> localSnapshot =
+                        getLocalSnapshot(metaStateHandle.getTaskLocalSnapshot(), sstFiles);
+                final SnapshotResult<KeyedStateHandle> snapshotResult =
+                        localSnapshot
+                                .map(
+                                        keyedStateHandle ->
+                                                SnapshotResult.withLocalState(
+                                                        jmIncrementalKeyedStateHandle,
+                                                        keyedStateHandle))
+                                .orElseGet(() -> SnapshotResult.of(jmIncrementalKeyedStateHandle));
 
                 completed = true;
 
@@ -450,35 +325,7 @@ public class RocksIncrementalSnapshotStrategy<K>
                     statesToDiscard.add(metaStateHandle);
                     statesToDiscard.addAll(miscFiles.values());
                     statesToDiscard.addAll(sstFiles.values());
-                    cleanupIncompleteSnapshot(statesToDiscard);
-                }
-            }
-        }
-
-        private long getUploadedStateSize(Collection<StreamStateHandle> streamStateHandles) {
-            return streamStateHandles.stream()
-                    .filter(s -> !(s instanceof PlaceholderStreamStateHandle))
-                    .mapToLong(StateObject::getStateSize)
-                    .sum();
-        }
-
-        private void cleanupIncompleteSnapshot(@Nonnull List<StateObject> statesToDiscard) {
-
-            try {
-                StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
-            } catch (Exception e) {
-                LOG.warn("Could not properly discard states.", e);
-            }
-
-            if (localBackupDirectory.isSnapshotCompleted()) {
-                try {
-                    DirectoryStateHandle directoryStateHandle =
-                            localBackupDirectory.completeSnapshotAndGetHandle();
-                    if (directoryStateHandle != null) {
-                        directoryStateHandle.discardState();
-                    }
-                } catch (Exception e) {
-                    LOG.warn("Could not properly discard local state.", e);
+                    cleanupIncompleteSnapshot(statesToDiscard, localBackupDirectory);
                 }
             }
         }
@@ -562,113 +409,5 @@ public class RocksIncrementalSnapshotStrategy<K>
                 }
             }
         }
-
-        @Nonnull
-        private SnapshotResult<StreamStateHandle> materializeMetaData(
-                @Nonnull CloseableRegistry snapshotCloseableRegistry) throws Exception {
-
-            CheckpointStreamWithResultProvider streamWithResultProvider =
-                    localRecoveryConfig.isLocalRecoveryEnabled()
-                            ? CheckpointStreamWithResultProvider.createDuplicatingStream(
-                                    checkpointId,
-                                    CheckpointedStateScope.EXCLUSIVE,
-                                    checkpointStreamFactory,
-                                    localRecoveryConfig
-                                            .getLocalStateDirectoryProvider()
-                                            .orElseThrow(
-                                                    LocalRecoveryConfig.localRecoveryNotEnabled()))
-                            : CheckpointStreamWithResultProvider.createSimpleStream(
-                                    CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
-
-            snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
-
-            try {
-                // no need for compression scheme support because sst-files are already compressed
-                KeyedBackendSerializationProxy<K> serializationProxy =
-                        new KeyedBackendSerializationProxy<>(
-                                keySerializer, stateMetaInfoSnapshots, false);
-
-                DataOutputView out =
-                        new DataOutputViewStreamWrapper(
-                                streamWithResultProvider.getCheckpointOutputStream());
-
-                serializationProxy.write(out);
-
-                if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
-                    SnapshotResult<StreamStateHandle> result =
-                            streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
-                    streamWithResultProvider = null;
-                    return result;
-                } else {
-                    throw new IOException("Stream already closed and cannot return a handle.");
-                }
-            } finally {
-                if (streamWithResultProvider != null) {
-                    if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
-                        IOUtils.closeQuietly(streamWithResultProvider);
-                    }
-                }
-            }
-        }
-    }
-
-    static class IncrementalRocksDBSnapshotResources implements SnapshotResources {
-        @Nonnull private final SnapshotDirectory snapshotDirectory;
-
-        @Nonnull private final PreviousSnapshot previousSnapshot;
-
-        @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-
-        public IncrementalRocksDBSnapshotResources(
-                SnapshotDirectory snapshotDirectory,
-                PreviousSnapshot previousSnapshot,
-                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
-            this.snapshotDirectory = snapshotDirectory;
-            this.previousSnapshot = previousSnapshot;
-            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-        }
-
-        @Override
-        public void release() {
-            try {
-                if (snapshotDirectory.exists()) {
-                    LOG.trace(
-                            "Running cleanup for local RocksDB backup directory {}.",
-                            snapshotDirectory);
-                    boolean cleanupOk = snapshotDirectory.cleanup();
-
-                    if (!cleanupOk) {
-                        LOG.debug("Could not properly cleanup local RocksDB backup directory.");
-                    }
-                }
-            } catch (IOException e) {
-                LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
-            }
-        }
-    }
-
-    private static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-            new PreviousSnapshot(Collections.emptyMap());
-
-    private static class PreviousSnapshot {
-
-        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
-
-        private PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) {
-            this.confirmedSstFiles = confirmedSstFiles;
-        }
-
-        private Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) {
-            if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) {
-                // we introduce a placeholder state handle, that is replaced with the
-                // original from the shared state registry (created from a previous checkpoint)
-                return Optional.of(
-                        new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
-            } else {
-                // Don't use any uploaded but not confirmed handles because they might be deleted
-                // (by TM) if the previous checkpoint failed. See FLINK-25395
-                return Optional.empty();
-            }
-        }
     }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
new file mode 100644
index 00000000000..eafffebe4ca
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.SnapshotDirectory;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.RocksDB;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize;
+
+/**
+ * Snapshot strategy for {@link RocksDBKeyedStateBackend} based on RocksDB's native checkpoints and
+ * creates full snapshots. the difference between savepoint is that sst files will be uploaded
+ * rather than states.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksNativeFullSnapshotStrategy<K>
+        extends RocksDBSnapshotStrategyBase<
+                K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
+
+    private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";
+
+    /** The help class used to upload state files. */
+    private final RocksDBStateUploader stateUploader;
+
+    public RocksNativeFullSnapshotStrategy(
+            @Nonnull RocksDB db,
+            @Nonnull ResourceGuard rocksDBResourceGuard,
+            @Nonnull TypeSerializer<K> keySerializer,
+            @Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
+            @Nonnull KeyGroupRange keyGroupRange,
+            @Nonnegative int keyGroupPrefixBytes,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull File instanceBasePath,
+            @Nonnull UUID backendUID,
+            @Nonnull RocksDBStateUploader rocksDBStateUploader) {
+        super(
+                DESCRIPTION,
+                db,
+                rocksDBResourceGuard,
+                keySerializer,
+                kvStateInformation,
+                keyGroupRange,
+                keyGroupPrefixBytes,
+                localRecoveryConfig,
+                instanceBasePath,
+                backendUID);
+        this.stateUploader = rocksDBStateUploader;
+    }
+
+    @Override
+    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
+            NativeRocksDBSnapshotResources snapshotResources,
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+            @Nonnull CheckpointOptions checkpointOptions) {
+
+        if (snapshotResources.stateMetaInfoSnapshots.isEmpty()) {
+            return registry -> SnapshotResult.empty();
+        }
+
+        return new RocksDBNativeFullSnapshotOperation(
+                checkpointId,
+                checkpointStreamFactory,
+                snapshotResources.snapshotDirectory,
+                snapshotResources.stateMetaInfoSnapshots);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long completedCheckpointId) {
+        // nothing to do
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long abortedCheckpointId) {
+        // nothing to do
+    }
+
+    @Override
+    protected PreviousSnapshot snapshotMetaData(
+            long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+        for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
+                kvStateInformation.entrySet()) {
+            stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
+        }
+        return EMPTY_PREVIOUS_SNAPSHOT;
+    }
+
+    @Override
+    public void close() {
+        stateUploader.close();
+    }
+
+    /** Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend. */
+    private final class RocksDBNativeFullSnapshotOperation extends RocksDBSnapshotOperation {
+
+        private RocksDBNativeFullSnapshotOperation(
+                long checkpointId,
+                @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+                @Nonnull SnapshotDirectory localBackupDirectory,
+                @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+
+            super(
+                    checkpointId,
+                    checkpointStreamFactory,
+                    localBackupDirectory,
+                    stateMetaInfoSnapshots);
+        }
+
+        @Override
+        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
+                throws Exception {
+
+            boolean completed = false;
+
+            // Handle to the meta data file
+            SnapshotResult<StreamStateHandle> metaStateHandle = null;
+            // Handles to all the files in the current snapshot will go here
+            final Map<StateHandleID, StreamStateHandle> privateFiles = new HashMap<>();
+
+            try {
+
+                metaStateHandle =
+                        materializeMetaData(
+                                snapshotCloseableRegistry,
+                                stateMetaInfoSnapshots,
+                                checkpointId,
+                                checkpointStreamFactory);
+
+                // Sanity checks - they should never fail
+                Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
+                Preconditions.checkNotNull(
+                        metaStateHandle.getJobManagerOwnedSnapshot(),
+                        "Metadata for job manager was not properly created.");
+
+                uploadSstFiles(privateFiles, snapshotCloseableRegistry);
+                long checkpointedSize = metaStateHandle.getStateSize();
+                checkpointedSize += getUploadedStateSize(privateFiles.values());
+
+                final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
+                        new IncrementalRemoteKeyedStateHandle(
+                                backendUID,
+                                keyGroupRange,
+                                checkpointId,
+                                Collections.emptyMap(),
+                                privateFiles,
+                                metaStateHandle.getJobManagerOwnedSnapshot(),
+                                checkpointedSize);
+
+                Optional<KeyedStateHandle> localSnapshot =
+                        getLocalSnapshot(
+                                metaStateHandle.getTaskLocalSnapshot(), Collections.emptyMap());
+                final SnapshotResult<KeyedStateHandle> snapshotResult =
+                        localSnapshot
+                                .map(
+                                        keyedStateHandle ->
+                                                SnapshotResult.withLocalState(
+                                                        jmIncrementalKeyedStateHandle,
+                                                        keyedStateHandle))
+                                .orElseGet(() -> SnapshotResult.of(jmIncrementalKeyedStateHandle));
+
+                completed = true;
+
+                return snapshotResult;
+            } finally {
+                if (!completed) {
+                    final List<StateObject> statesToDiscard =
+                            new ArrayList<>(1 + privateFiles.size());
+                    statesToDiscard.add(metaStateHandle);
+                    statesToDiscard.addAll(privateFiles.values());
+                    cleanupIncompleteSnapshot(statesToDiscard, localBackupDirectory);
+                }
+            }
+        }
+
+        private void uploadSstFiles(
+                @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles,
+                @Nonnull CloseableRegistry snapshotCloseableRegistry)
+                throws Exception {
+
+            // write state data
+            Preconditions.checkState(localBackupDirectory.exists());
+
+            Map<StateHandleID, Path> privateFilePaths = new HashMap<>();
+
+            Path[] files = localBackupDirectory.listDirectory();
+            if (files != null) {
+                // all sst files are private in full snapshot
+                for (Path filePath : files) {
+                    final String fileName = filePath.getFileName().toString();
+                    final StateHandleID stateHandleID = new StateHandleID(fileName);
+                    privateFilePaths.put(stateHandleID, filePath);
+                }
+
+                privateFiles.putAll(
+                        stateUploader.uploadFilesToCheckpointFs(
+                                privateFilePaths,
+                                checkpointStreamFactory,
+                                CheckpointedStateScope.EXCLUSIVE,
+                                snapshotCloseableRegistry));
+            }
+        }
+    }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
index cf0692c6777..10425708b4f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.contrib.streaming.state.snapshot;
 
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Collection;
+
 /**
  * Utility methods and constants around RocksDB creating and restoring snapshots for {@link
  * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
@@ -30,4 +36,11 @@ public class RocksSnapshotUtil {
     private RocksSnapshotUtil() {
         throw new AssertionError();
     }
+
+    public static long getUploadedStateSize(Collection<StreamStateHandle> streamStateHandles) {
+        return streamStateHandles.stream()
+                .filter(s -> !(s instanceof PlaceholderStreamStateHandle))
+                .mapToLong(StateObject::getStateSize)
+                .sum();
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index f021cf5306a..46788406c16 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -362,11 +362,6 @@ public class EmbeddedRocksDBStateBackendTest
 
             RocksDB spyDB = keyedStateBackend.db;
 
-            if (!enableIncrementalCheckpointing) {
-                verify(spyDB, times(1)).getSnapshot();
-                verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
-            }
-
             // Ensure every RocksObjects not closed yet
             for (RocksObject rocksCloseable : allCreatedCloseables) {
                 verify(rocksCloseable, times(0)).close();
@@ -662,11 +657,6 @@ public class EmbeddedRocksDBStateBackendTest
         assertNotNull(null, keyedStateBackend.db);
         RocksDB spyDB = keyedStateBackend.db;
 
-        if (!enableIncrementalCheckpointing) {
-            verify(spyDB, times(1)).getSnapshot();
-            verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class));
-        }
-
         keyedStateBackend.dispose();
         verify(spyDB, times(1)).close();
         assertEquals(true, keyedStateBackend.isDisposed());
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
index 927192fa0da..4e4a85aaf66 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -174,7 +174,7 @@ public class RocksIncrementalSnapshotStrategyTest {
             CloseableRegistry closeableRegistry)
             throws Exception {
 
-        RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources snapshotResources =
+        RocksIncrementalSnapshotStrategy.NativeRocksDBSnapshotResources snapshotResources =
                 checkpointSnapshotStrategy.syncPrepareResources(checkpointId);
 
         return (IncrementalRemoteKeyedStateHandle)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java
index 005752dda96..dddb6eaa2b3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java
@@ -27,4 +27,9 @@ public class FullSnapshotRocksDbTtlStateTest extends RocksDBTtlStateTestBase {
     StateBackend createStateBackend() {
         return createStateBackend(TernaryBoolean.FALSE);
     }
+
+    @Override
+    public boolean isSavepoint() {
+        return false;
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java
index 7f30e62c712..0cd83003f9c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java
@@ -29,7 +29,7 @@ public class IncSnapshotRocksDbTtlStateTest extends RocksDBTtlStateTestBase {
     }
 
     @Override
-    public boolean fullSnapshot() {
+    public boolean isSavepoint() {
         return false;
     }
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index a4851cf51d0..13eaf4403ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -231,6 +231,72 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
         }
     }
 
+    @Test
+    public void testExternalizedSwitchRocksDBCheckpointsStandalone() throws Exception {
+        final File checkpointDir = temporaryFolder.newFolder();
+        StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+        StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+        testExternalizedCheckpoints(
+                checkpointDir,
+                null,
+                previousStateBackend,
+                newStateBackend,
+                previousStateBackend,
+                false,
+                restoreMode);
+    }
+
+    @Test
+    public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryStandalone()
+            throws Exception {
+        final File checkpointDir = temporaryFolder.newFolder();
+        StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+        StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+        testExternalizedCheckpoints(
+                checkpointDir,
+                null,
+                previousStateBackend,
+                newStateBackend,
+                previousStateBackend,
+                true,
+                restoreMode);
+    }
+
+    @Test
+    public void testExternalizedSwitchRocksDBCheckpointsZookeeper() throws Exception {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
+            final File checkpointDir = temporaryFolder.newFolder();
+            StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+            StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+            testExternalizedCheckpoints(
+                    checkpointDir,
+                    zkServer.getConnectString(),
+                    previousStateBackend,
+                    newStateBackend,
+                    previousStateBackend,
+                    false,
+                    restoreMode);
+        }
+    }
+
+    @Test
+    public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryZookeeper()
+            throws Exception {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
+            final File checkpointDir = temporaryFolder.newFolder();
+            StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+            StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+            testExternalizedCheckpoints(
+                    checkpointDir,
+                    zkServer.getConnectString(),
+                    previousStateBackend,
+                    newStateBackend,
+                    previousStateBackend,
+                    true,
+                    restoreMode);
+        }
+    }
+
     private FsStateBackend createFsStateBackend(File checkpointDir) throws IOException {
         return new FsStateBackend(checkpointDir.toURI().toString(), true);
     }
@@ -248,6 +314,25 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
             boolean localRecovery,
             RestoreMode restoreMode)
             throws Exception {
+        testExternalizedCheckpoints(
+                checkpointDir,
+                zooKeeperQuorum,
+                backend,
+                backend,
+                backend,
+                localRecovery,
+                restoreMode);
+    }
+
+    private static void testExternalizedCheckpoints(
+            File checkpointDir,
+            String zooKeeperQuorum,
+            StateBackend backend1,
+            StateBackend backend2,
+            StateBackend backend3,
+            boolean localRecovery,
+            RestoreMode restoreMode)
+            throws Exception {
 
         final Configuration config = new Configuration();
 
@@ -286,17 +371,17 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
         try {
             // main test sequence:  start job -> eCP -> restore job -> eCP -> restore job
             String firstExternalCheckpoint =
-                    runJobAndGetExternalizedCheckpoint(backend, null, cluster, restoreMode);
+                    runJobAndGetExternalizedCheckpoint(backend1, null, cluster, restoreMode);
             assertNotNull(firstExternalCheckpoint);
 
             String secondExternalCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            backend, firstExternalCheckpoint, cluster, restoreMode);
+                            backend2, firstExternalCheckpoint, cluster, restoreMode);
             assertNotNull(secondExternalCheckpoint);
 
             String thirdExternalCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            backend,
+                            backend3,
                             // in CLAIM mode, the previous run is only guaranteed to preserve the
                             // latest checkpoint; in NO_CLAIM/LEGACY, even the initial checkpoints
                             // must remain valid
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index ff2706f128c..b5b96c9380e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -75,6 +75,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public class SavepointFormatITCase extends TestLogger {
     private static final Logger LOG = LoggerFactory.getLogger(SavepointFormatITCase.class);
 
+    private static final String STATE_BACKEND_ROCKSDB = "ROCKSDB";
+    private static final String STATE_BACKEND_HEAP = "HEAP";
+
     @TempDir Path checkpointsDir;
     @TempDir Path originalSavepointDir;
     @TempDir Path renamedSavepointDir;
@@ -122,7 +125,7 @@ public class SavepointFormatITCase extends TestLogger {
 
     private void validateNativeNonChangelogState(
             KeyedStateHandle state, StateBackendConfig backendConfig) {
-        if (backendConfig.isIncremental()) {
+        if (STATE_BACKEND_ROCKSDB.equals(backendConfig.getName())) {
             assertThat(state, instanceOf(IncrementalRemoteKeyedStateHandle.class));
         } else {
             assertThat(state, instanceOf(KeyGroupsStateHandle.class));
@@ -174,7 +177,7 @@ public class SavepointFormatITCase extends TestLogger {
         return new StateBackendConfig(changelogEnabled, incremental /* ignored for now */) {
             @Override
             public String getName() {
-                return "HEAP";
+                return STATE_BACKEND_HEAP;
             }
 
             @Override
@@ -201,7 +204,7 @@ public class SavepointFormatITCase extends TestLogger {
         return new StateBackendConfig(changelogEnabled, incremental) {
             @Override
             public String getName() {
-                return "ROCKSDB";
+                return STATE_BACKEND_ROCKSDB;
             }
 
             @Override