You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/02 07:44:06 UTC

[GitHub] [flink] Myasuka commented on a change in pull request #19331: [FLINK-26985][runtime] Don't discard shared state of restored checkpoints

Myasuka commented on a change in pull request #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r841003426



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
##########
@@ -269,17 +317,19 @@ private void testExternalizedCheckpoints(
         try {
             // main test sequence:  start job -> eCP -> restore job -> eCP -> restore job
             String firstExternalCheckpoint =
-                    runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
+                    runJobAndGetExternalizedCheckpoint(
+                            backend, checkpointDir, null, client, restoreMode);
             assertNotNull(firstExternalCheckpoint);
 
             String secondExternalCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            backend, checkpointDir, firstExternalCheckpoint, client);
+                            backend, checkpointDir, firstExternalCheckpoint, client, restoreMode);
             assertNotNull(secondExternalCheckpoint);
 
             String thirdExternalCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            backend, checkpointDir, secondExternalCheckpoint, client);
+                            // restore from the 1st external checkpoint path
+                            backend, checkpointDir, firstExternalCheckpoint, client, restoreMode);

Review comment:
       The previous test actually follow the steps:
   create 1st checkpoint --> restore from 1st checkpoint and then create 2nd checkpoint -> restore from 2nd and then create the 3rd one.
   
   However, this PR would change the original test purpose as it would restore from the 1st job on the 3rd run.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
##########
@@ -51,6 +53,9 @@
     /** Executor for async state deletion */
     private final Executor asyncDisposalExecutor;
 
+    /** Checkpoint ID below which no state is discarded, inclusive. */
+    private long highestRetainCheckpointID = -1L;

Review comment:
       I think `highestRetainCheckpointID` might not be a good choice as we could still retain multi checkpoints even in `CLAIM` restore mode. However, current implementation would still keep the `highestRetainCheckpointID` as `-1` in `CLAIM` restore mode.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
##########
@@ -37,13 +38,15 @@
      * @param sharedStateRegistryFactory Simple factory to produce {@link SharedStateRegistry}
      *     objects.
      * @param ioExecutor Executor used to run (async) deletes.
+     * @param restoreMode the job in which the job is restoring

Review comment:
       ```suggestion
        * @param restoreMode the restore mode with which the job is restoring.
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
##########
@@ -251,13 +267,16 @@ public void run() {
         /** The shared state handle */
         StreamStateHandle stateHandle;
 
+        private final long createdByCheckpointID;

Review comment:
       This field is not included in the `#toString()` method

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
##########
@@ -174,6 +181,15 @@ public void registerAll(
         }
     }
 
+    @Override
+    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode) {
+        registerAll(checkpoint.getOperatorStates().values(), checkpoint.getCheckpointID());
+        if (mode != RestoreMode.CLAIM) {

Review comment:
       Why only `CLAIM` mode does not need to update the `highestRetainCheckpointID`? I think this deserve a description.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org