You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/09 12:49:34 UTC

[flink] 01/03: [FLINK-25155] Implement claim snapshot restore mode

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c4735bc0191c270a6911fa35b3af5f6dd3c6d9e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Nov 5 16:15:42 2021 +0100

    [FLINK-25155] Implement claim snapshot restore mode
    
    We add a RestoreMode flag to be passed along with the savepoint path. It determines the way a snapshot should be restored from. If a CLAIM mode is specified Flink will take the ownership of the initial snapshot and will treat it as if it was created as a regular checkpoint. It might delete it at a certain point in time.
---
 .../generated/savepoint_config_configuration.html  |  6 ++
 .../runtime/checkpoint/CheckpointCoordinator.java  | 31 ++++++--
 .../flink/runtime/checkpoint/Checkpoints.java      |  8 +-
 .../apache/flink/runtime/jobgraph/RestoreMode.java | 50 ++++++++++++
 .../runtime/jobgraph/SavepointConfigOptions.java   | 10 +++
 .../runtime/jobgraph/SavepointRestoreSettings.java | 40 +++++++---
 .../scheduler/DefaultExecutionGraphFactory.java    |  3 +-
 .../checkpoint/CheckpointMetadataLoadingTest.java  | 40 ++++++++--
 .../flink-sql-client/src/test/resources/sql/set.q  |  5 ++
 .../flink/test/checkpointing/SavepointITCase.java  | 92 ++++++++++++++++++++--
 10 files changed, 252 insertions(+), 33 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html b/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
index 1487592..62a25a6 100644
--- a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>execution.savepoint-restore-mode</h5></td>
+            <td style="word-wrap: break-word;">LEGACY</td>
+            <td><p>Enum</p></td>
+            <td>Describes the mode how Flink should restore from the given savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": Flink will take ownership of the given snapshot. It will clean the snapshot once it is subsumed by newer ones.</li><li>"LEGACY": Flink will not claim ownership of the snapshot and will not delete the files. However, it can directly depend on the existence of the files of the restored checkpoint. It might not be safe to delete checkpoints [...]
+        </tr>
+        <tr>
             <td><h5>execution.savepoint.ignore-unclaimed-state</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9d76309..c70f858 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -1629,21 +1630,21 @@ public class CheckpointCoordinator {
     /**
      * Restore the state with given savepoint.
      *
-     * @param savepointPointer The pointer to the savepoint.
-     * @param allowNonRestored True if allowing checkpoint state that cannot be mapped to any job
-     *     vertex in tasks.
+     * @param restoreSettings Settings for a snapshot to restore from. Includes the path and
+     *     parameters for the restore process.
      * @param tasks Map of job vertices to restore. State for these vertices is restored via {@link
      *     Execution#setInitialState(JobManagerTaskRestore)}.
      * @param userClassLoader The class loader to resolve serialized classes in legacy savepoint
      *     versions.
      */
     public boolean restoreSavepoint(
-            String savepointPointer,
-            boolean allowNonRestored,
+            SavepointRestoreSettings restoreSettings,
             Map<JobVertexID, ExecutionJobVertex> tasks,
             ClassLoader userClassLoader)
             throws Exception {
 
+        final String savepointPointer = restoreSettings.getRestorePath();
+        final boolean allowNonRestored = restoreSettings.allowNonRestoredState();
         Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
 
         LOG.info(
@@ -1655,10 +1656,28 @@ public class CheckpointCoordinator {
         final CompletedCheckpointStorageLocation checkpointLocation =
                 checkpointStorageView.resolveCheckpoint(savepointPointer);
 
+        // convert to checkpoint so the system can fall back to it
+        final CheckpointProperties checkpointProperties;
+        switch (restoreSettings.getRestoreMode()) {
+            case CLAIM:
+                checkpointProperties = this.checkpointProperties;
+                break;
+            case LEGACY:
+                checkpointProperties = CheckpointProperties.forSavepoint(false);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown snapshot restore mode");
+        }
+
         // Load the savepoint as a checkpoint into the system
         CompletedCheckpoint savepoint =
                 Checkpoints.loadAndValidateCheckpoint(
-                        job, tasks, checkpointLocation, userClassLoader, allowNonRestored);
+                        job,
+                        tasks,
+                        checkpointLocation,
+                        userClassLoader,
+                        allowNonRestored,
+                        checkpointProperties);
 
         completedCheckpointStore.addCheckpoint(
                 savepoint, checkpointsCleaner, this::scheduleTriggerRequest);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 45ff0d1..98f86c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -122,7 +122,8 @@ public class Checkpoints {
             Map<JobVertexID, ExecutionJobVertex> tasks,
             CompletedCheckpointStorageLocation location,
             ClassLoader classLoader,
-            boolean allowNonRestoredState)
+            boolean allowNonRestoredState,
+            CheckpointProperties checkpointProperties)
             throws IOException {
 
         checkNotNull(jobId, "jobId");
@@ -202,9 +203,6 @@ public class Checkpoints {
             }
         }
 
-        // (3) convert to checkpoint so the system can fall back to it
-        CheckpointProperties props = CheckpointProperties.forSavepoint(false);
-
         return new CompletedCheckpoint(
                 jobId,
                 checkpointMetadata.getCheckpointId(),
@@ -212,7 +210,7 @@ public class Checkpoints {
                 0L,
                 operatorStates,
                 checkpointMetadata.getMasterStates(),
-                props,
+                checkpointProperties,
                 location);
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
new file mode 100644
index 0000000..62c0170
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Defines how Flink should restore from a given savepoint or retained checkpoint. */
+@PublicEvolving
+public enum RestoreMode implements DescribedEnum {
+    CLAIM(
+            "Flink will take ownership of the given snapshot. It will clean the"
+                    + " snapshot once it is subsumed by newer ones."),
+    LEGACY(
+            "Flink will not claim ownership of the snapshot and will not delete the files. However, "
+                    + "it can directly depend on the existence of the files of the restored checkpoint. "
+                    + "It might not be safe to delete checkpoints that were restored in legacy mode ");
+
+    private final String description;
+
+    RestoreMode(String description) {
+        this.description = description;
+    }
+
+    @Override
+    @Internal
+    public InlineElement getDescription() {
+        return text(description);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
index ed50663..17d9730 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
@@ -46,4 +46,14 @@ public class SavepointConfigOptions {
                     .withDescription(
                             "Allow to skip savepoint state that cannot be restored. "
                                     + "Allow this if you removed an operator from your pipeline after the savepoint was triggered.");
+    /**
+     * Describes the mode how Flink should restore from the given savepoint or retained checkpoint.
+     */
+    public static final ConfigOption<RestoreMode> RESTORE_MODE =
+            key("execution.savepoint-restore-mode")
+                    .enumType(RestoreMode.class)
+                    .defaultValue(RestoreMode.LEGACY)
+                    .withDescription(
+                            "Describes the mode how Flink should restore from the given"
+                                    + " savepoint or retained checkpoint.");
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 2db66bd..9ab25df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -32,10 +32,8 @@ public class SavepointRestoreSettings implements Serializable {
     private static final long serialVersionUID = 87377506900849777L;
 
     /** No restore should happen. */
-    private static final SavepointRestoreSettings NONE = new SavepointRestoreSettings(null, false);
-
-    /** By default, be strict when restoring from a savepoint. */
-    private static final boolean DEFAULT_ALLOW_NON_RESTORED_STATE = false;
+    private static final SavepointRestoreSettings NONE =
+            new SavepointRestoreSettings(null, false, RestoreMode.LEGACY);
 
     /** Savepoint restore path. */
     private final String restorePath;
@@ -46,15 +44,20 @@ public class SavepointRestoreSettings implements Serializable {
      */
     private final boolean allowNonRestoredState;
 
+    private final RestoreMode restoreMode;
+
     /**
      * Creates the restore settings.
      *
      * @param restorePath Savepoint restore path.
      * @param allowNonRestoredState Ignore unmapped state.
+     * @param restoreMode how to restore from the savepoint
      */
-    private SavepointRestoreSettings(String restorePath, boolean allowNonRestoredState) {
+    private SavepointRestoreSettings(
+            String restorePath, boolean allowNonRestoredState, RestoreMode restoreMode) {
         this.restorePath = restorePath;
         this.allowNonRestoredState = allowNonRestoredState;
+        this.restoreMode = restoreMode;
     }
 
     /**
@@ -86,6 +89,11 @@ public class SavepointRestoreSettings implements Serializable {
         return allowNonRestoredState;
     }
 
+    /** Tells how to restore from the given savepoint. */
+    public RestoreMode getRestoreMode() {
+        return restoreMode;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -129,13 +137,24 @@ public class SavepointRestoreSettings implements Serializable {
     }
 
     public static SavepointRestoreSettings forPath(String savepointPath) {
-        return forPath(savepointPath, DEFAULT_ALLOW_NON_RESTORED_STATE);
+        return forPath(
+                savepointPath,
+                SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue());
     }
 
     public static SavepointRestoreSettings forPath(
             String savepointPath, boolean allowNonRestoredState) {
         checkNotNull(savepointPath, "Savepoint restore path.");
-        return new SavepointRestoreSettings(savepointPath, allowNonRestoredState);
+        return new SavepointRestoreSettings(
+                savepointPath,
+                allowNonRestoredState,
+                SavepointConfigOptions.RESTORE_MODE.defaultValue());
+    }
+
+    public static SavepointRestoreSettings forPath(
+            String savepointPath, boolean allowNonRestoredState, RestoreMode restoreMode) {
+        checkNotNull(savepointPath, "Savepoint restore path.");
+        return new SavepointRestoreSettings(savepointPath, allowNonRestoredState, restoreMode);
     }
 
     // -------------------------- Parsing to and from a configuration object
@@ -144,9 +163,11 @@ public class SavepointRestoreSettings implements Serializable {
     public static void toConfiguration(
             final SavepointRestoreSettings savepointRestoreSettings,
             final Configuration configuration) {
-        configuration.setBoolean(
+        configuration.set(
                 SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
                 savepointRestoreSettings.allowNonRestoredState());
+        configuration.set(
+                SavepointConfigOptions.RESTORE_MODE, savepointRestoreSettings.getRestoreMode());
         final String savepointPath = savepointRestoreSettings.getRestorePath();
         if (savepointPath != null) {
             configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
@@ -157,8 +178,9 @@ public class SavepointRestoreSettings implements Serializable {
         final String savepointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH);
         final boolean allowNonRestored =
                 configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+        final RestoreMode restoreMode = configuration.get(SavepointConfigOptions.RESTORE_MODE);
         return savepointPath == null
                 ? SavepointRestoreSettings.none()
-                : SavepointRestoreSettings.forPath(savepointPath, allowNonRestored);
+                : SavepointRestoreSettings.forPath(savepointPath, allowNonRestored, restoreMode);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 8284572..502b1ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -173,8 +173,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
                 checkpointCoordinator.restoreSavepoint(
-                        savepointRestoreSettings.getRestorePath(),
-                        savepointRestoreSettings.allowNonRestoredState(),
+                        savepointRestoreSettings,
                         executionGraphToRestore.getAllVertices(),
                         userCodeClassLoader);
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index c809573..e5a8c87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -71,7 +71,13 @@ public class CheckpointMetadataLoadingTest {
                 createTasks(operatorId, parallelism, parallelism);
 
         final CompletedCheckpoint loaded =
-                Checkpoints.loadAndValidateCheckpoint(jobId, tasks, testSavepoint, cl, false);
+                Checkpoints.loadAndValidateCheckpoint(
+                        jobId,
+                        tasks,
+                        testSavepoint,
+                        cl,
+                        false,
+                        CheckpointProperties.forSavepoint(false));
 
         assertEquals(jobId, loaded.getJobId());
         assertEquals(checkpointId, loaded.getCheckpointID());
@@ -89,7 +95,13 @@ public class CheckpointMetadataLoadingTest {
                 createTasks(operatorId, parallelism, parallelism + 1);
 
         try {
-            Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+            Checkpoints.loadAndValidateCheckpoint(
+                    new JobID(),
+                    tasks,
+                    testSavepoint,
+                    cl,
+                    false,
+                    CheckpointProperties.forSavepoint(false));
             fail("Did not throw expected Exception");
         } catch (IllegalStateException expected) {
             assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
@@ -109,7 +121,13 @@ public class CheckpointMetadataLoadingTest {
         final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
 
         try {
-            Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+            Checkpoints.loadAndValidateCheckpoint(
+                    new JobID(),
+                    tasks,
+                    testSavepoint,
+                    cl,
+                    false,
+                    CheckpointProperties.forSavepoint(false));
             fail("Did not throw expected Exception");
         } catch (IllegalStateException expected) {
             assertTrue(expected.getMessage().contains("allowNonRestoredState"));
@@ -129,7 +147,13 @@ public class CheckpointMetadataLoadingTest {
         final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
 
         final CompletedCheckpoint loaded =
-                Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, true);
+                Checkpoints.loadAndValidateCheckpoint(
+                        new JobID(),
+                        tasks,
+                        testSavepoint,
+                        cl,
+                        true,
+                        CheckpointProperties.forSavepoint(false));
 
         assertTrue(loaded.getOperatorStates().isEmpty());
     }
@@ -152,7 +176,13 @@ public class CheckpointMetadataLoadingTest {
         final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
 
         try {
-            Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+            Checkpoints.loadAndValidateCheckpoint(
+                    new JobID(),
+                    tasks,
+                    testSavepoint,
+                    cl,
+                    false,
+                    CheckpointProperties.forSavepoint(false));
             fail("Did not throw expected Exception");
         } catch (IllegalStateException expected) {
             assertTrue(expected.getMessage().contains("allowNonRestoredState"));
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index a81b009..02a41f2 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -39,6 +39,7 @@ CREATE TABLE hive_table (
 # list the configured configuration
 set;
 'execution.attached' = 'true'
+'execution.savepoint-restore-mode' = 'LEGACY'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -56,6 +57,7 @@ reset;
 
 set;
 'execution.attached' = 'true'
+'execution.savepoint-restore-mode' = 'LEGACY'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -88,6 +90,7 @@ Was expecting one of:
 
 set;
 'execution.attached' = 'true'
+'execution.savepoint-restore-mode' = 'LEGACY'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -107,6 +110,7 @@ reset 'execution.attached';
 
 set;
 'execution.attached' = 'true'
+'execution.savepoint-restore-mode' = 'LEGACY'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
@@ -127,6 +131,7 @@ $VAR_UDF_JAR_PATH
 
 set;
 'execution.attached' = 'true'
+'execution.savepoint-restore-mode' = 'LEGACY'
 'execution.savepoint.ignore-unclaimed-state' = 'false'
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index b791873..a637b9c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -309,6 +310,56 @@ public class SavepointITCase extends TestLogger {
     }
 
     @Test
+    public void testTriggerSavepointAndResumeWithClaim() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+        final int parallelism = numTaskManagers * numSlotsPerTaskManager;
+
+        final MiniClusterResourceFactory clusterFactory =
+                new MiniClusterResourceFactory(
+                        numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
+
+        final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism);
+        verifySavepoint(parallelism, savepointPath);
+        restoreJobAndVerifyState(
+                clusterFactory,
+                parallelism,
+                SavepointRestoreSettings.forPath(savepointPath, false, RestoreMode.CLAIM),
+                cluster -> {
+                    cluster.after();
+
+                    assertFalse(
+                            "Savepoint not properly cleaned up.",
+                            new File(new URI(savepointPath)).exists());
+                });
+    }
+
+    @Test
+    public void testTriggerSavepointAndResumeWithLegacyMode() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+        final int parallelism = numTaskManagers * numSlotsPerTaskManager;
+
+        final MiniClusterResourceFactory clusterFactory =
+                new MiniClusterResourceFactory(
+                        numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
+
+        final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism);
+        verifySavepoint(parallelism, savepointPath);
+        restoreJobAndVerifyState(
+                clusterFactory,
+                parallelism,
+                SavepointRestoreSettings.forPath(savepointPath, false, RestoreMode.LEGACY),
+                cluster -> {
+                    cluster.after();
+
+                    assertTrue(
+                            "Savepoint unexpectedly cleaned up.",
+                            new File(new URI(savepointPath)).exists());
+                });
+    }
+
+    @Test
     public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath()
             throws Exception {
         final int numTaskManagers = 2;
@@ -345,7 +396,16 @@ public class SavepointITCase extends TestLogger {
         final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism);
         assertThat(savepointDir, hasEntropyInFileStateHandlePaths());
 
-        restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism);
+        restoreJobAndVerifyState(
+                clusterFactory,
+                parallelism,
+                SavepointRestoreSettings.forPath(savepointPath),
+                cluster -> {
+                    final URI localURI = new URI(savepointPath.replace("test-entropy:/", "file:/"));
+                    assertTrue("Savepoint has not been created", new File(localURI).exists());
+                    cluster.getClusterClient().disposeSavepoint(savepointPath).get();
+                    assertFalse("Savepoint not properly cleaned up.", new File(localURI).exists());
+                });
     }
 
     private Configuration getCheckpointingWithEntropyConfig() {
@@ -407,9 +467,31 @@ public class SavepointITCase extends TestLogger {
     private void restoreJobAndVerifyState(
             String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism)
             throws Exception {
+        restoreJobAndVerifyState(
+                clusterFactory,
+                parallelism,
+                SavepointRestoreSettings.forPath(savepointPath, false),
+                cluster -> {
+                    cluster.getClusterClient().disposeSavepoint(savepointPath).get();
+                    assertFalse(
+                            "Savepoint not properly cleaned up.",
+                            new File(new URI(savepointPath)).exists());
+                });
+    }
+
+    @FunctionalInterface
+    interface PostCancelChecker {
+        void check(MiniClusterWithClientResource cluster) throws Exception;
+    }
+
+    private void restoreJobAndVerifyState(
+            MiniClusterResourceFactory clusterFactory,
+            int parallelism,
+            SavepointRestoreSettings savepointRestoreSettings,
+            PostCancelChecker postCancelChecks)
+            throws Exception {
         final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
-        jobGraph.setSavepointRestoreSettings(
-                SavepointRestoreSettings.forPath(savepointPath, false));
+        jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
         final JobID jobId = jobGraph.getJobID();
         StatefulCounter.resetForTest(parallelism);
 
@@ -435,9 +517,7 @@ public class SavepointITCase extends TestLogger {
                     status -> status == JobStatus.CANCELED,
                     TestingUtils.defaultScheduledExecutor());
 
-            client.disposeSavepoint(savepointPath).get();
-
-            assertFalse("Savepoint not properly cleaned up.", new File(savepointPath).exists());
+            postCancelChecks.check(cluster);
         } finally {
             cluster.after();
             StatefulCounter.resetForTest(parallelism);