You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/06 08:13:21 UTC

[GitHub] [flink] Myasuka commented on a diff in pull request #19331: [FLINK-26985][runtime] Don't discard shared state of restored checkpoints

Myasuka commented on code in PR #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r843592451


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -32,11 +33,11 @@
 
     /** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */
     SharedStateRegistryFactory DEFAULT_FACTORY =
-            (deleteExecutor, checkpoints) -> {
+            (deleteExecutor, checkpoints, restoreMode) -> {
                 SharedStateRegistry sharedStateRegistry =
                         new SharedStateRegistryImpl(deleteExecutor);
                 for (CompletedCheckpoint checkpoint : checkpoints) {
-                    checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+                    checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, restoreMode);

Review Comment:
   What will happen if JM crashed and relaunch another JM? Will all registered checkpoints in `NO_CLAIM` or `LEGACY` mode would be retained forever?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -66,10 +67,25 @@ StreamStateHandle registerReference(
     /**
      * Register given shared states in the registry.
      *
+     * <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after recovery), please use
+     * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)}
+     *
      * @param stateHandles The shared states to register.
      * @param checkpointID which uses the states.
      */
     void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, long checkpointID);
 
+    /**
+     * Set the lowest checkpoint ID below which no state is discarded, inclusive.
+     *
+     * <p>After recovery from an incremental checkpoint, its state should NOT be discarded, even if
+     * {@link #unregisterUnusedState(long) not used} anymore (unless recovering in {@link
+     * RestoreMode#CLAIM CLAIM} mode).
+     *
+     * <p>This should hold for both cases: when recovering from that initial checkpoint; and from
+     * any subsequent checkpoint derived from it.
+     */
+    void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode);

Review Comment:
   Just as what I asked in the above, can we ensure this method only be called during restore from checkpoint/savepoint?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java:
##########
@@ -53,4 +53,6 @@
     public InlineElement getDescription() {
         return text(description);
     }
+
+    public static final RestoreMode DEFAULT = CLAIM;

Review Comment:
   The default restore mode, according to `execution.savepoint-restore-mode`, should be `NO_CLAIM`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -51,6 +53,9 @@
     /** Executor for async state deletion */
     private final Executor asyncDisposalExecutor;
 
+    /** Checkpoint ID below which no state is discarded, inclusive. */
+    private long highestRetainCheckpointID = -1L;

Review Comment:
   I feel that the naming of `retain` could make user mixed with the configuration `state.checkpoints.num-retained`. How about `reservedCheckpointIdOnRestore`, and we give clear description that `-1` means no checkpoint needs to be reserved on restore (actualy CLAIM mode).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org