You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/10 12:59:20 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r894476700


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -50,6 +56,9 @@ public class CheckpointsCleaner implements Serializable, AutoCloseableAsync {
     @Nullable
     private CompletableFuture<Void> cleanUpFuture;
 
+    /** All subsumed checkpoints. */
+    private final Map<Long, CompletedCheckpoint> subsumedCheckpoints = new HashMap<>();

Review Comment:
   Looks like this field can be a `List` now?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {

Review Comment:
   Could you please add javadoc to this method?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,12 +59,13 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID);
+
     /**
      * Unregister state that is not referenced by the given checkpoint ID or any newer.
      *
      * @param lowestCheckpointID which is still valid
      */
-    void unregisterUnusedState(long lowestCheckpointID);
+    Set<Long> unregisterUnusedState(long lowestCheckpointID);

Review Comment:
   Could you please update javadoc and describe what does this method returns now?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);
+        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+    }
+
+    @Test
+    public void testCheckpointFolderDeletion() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // cancel after next materialization
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 100);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        Thread.sleep(1000);

Review Comment:
   Could you please explain why this `sleep` is needed?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);

Review Comment:
   This test relies on materialization, but the interval is quite big and might prevent it from happening - should it be decreased?
   
   And ideally, we should verify the presense of the materialized part - similar to `ChangelogPeriodicMaterializationITCase`.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);

Review Comment:
   Could you please explain why this `sleep` is needed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) {
+                    try {
+                        LOG.debug("Try to discard checkpoint {}.", checkpoint.getCheckpointID());
+                        cleanCheckpoint(
+                                checkpoint,
+                                checkpoint.shouldBeDiscardedOnSubsume(),
+                                postCleanAction,
+                                executor);
+                        iterator.remove();
+                    } catch (Exception e) {
+                        LOG.warn("Fail to discard the old checkpoint {}.", checkpoint);
+                    }
+                }
+            }
+        }
+    }
+
+    @VisibleForTesting
+    void cleanSubsumedCheckpointsWithException(
+            long upTo,
+            Set<Long> stillInUse,
+            Runnable postCleanAction,
+            Executor executor,
+            DiscardException discardException) {

Review Comment:
   IIUC, this method is only used in test; and that test verifies this method behavior. This doesn't make much sense to me, maybe we can just remove both?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);
+        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+    }
+
+    @Test
+    public void testCheckpointFolderDeletion() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // cancel after next materialization
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 100);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        Thread.sleep(1000);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        assertFalse(checkpointFolderExists(firstRestorePath.substring(5)));

Review Comment:
   1. Ideally, folder deletion should be tested also in case of checkpoint subsumption.
   2. Is it guaranteed that the folder is cleaned up by the time `cancelJob.get` returns? If not, the assertion might be flaky



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);

Review Comment:
   There's no guarantee that private state is still needed - it could be that the delegated state backend materialized with completely new private state, right? (just asking)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) {
+                    try {
+                        LOG.debug("Try to discard checkpoint {}.", checkpoint.getCheckpointID());
+                        cleanCheckpoint(

Review Comment:
   I am concerned about `synchronized (subsumedCheckpoints)`:
   `cleanCheckpoint` eventually calls `incrementNumberOfCheckpointsToClean` which is synchronized on `this`. Synchronizing on a single object would make reasoning easier and less error-prone, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org