You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/07 07:53:38 UTC

[GitHub] [flink] curcur commented on a diff in pull request #19331: [FLINK-26985][runtime] Don't discard shared state of restored checkpoints

curcur commented on code in PR #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r844713653


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -207,11 +208,13 @@ public long getStateSize() {
      * checkpoint is added into the store.
      *
      * @param sharedStateRegistry The registry where shared states are registered
+     * @param restoreMode the mode in which this checkpoint was restored from
      */
-    public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) {
+    public void registerSharedStatesAfterRestored(
+            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {

Review Comment:
   Does it register/expect to register any SHARED states with **previous jobs** in NO_CLAIM Mode?
   
   This may be related to my question in `SharedStateRegistry#registerAllAfterRestored`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -174,6 +181,20 @@ public void registerAll(
         }
     }
 
+    @Override
+    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode) {
+        registerAll(checkpoint.getOperatorStates().values(), checkpoint.getCheckpointID());
+        // In NO_CLAIM and LEGACY restore modes, shared state of the initial checkpoints must be
+        // preserved. This is achieved by advancing highestRetainCheckpointID here, and then
+        // checking entry.createdByCheckpointID against it on checkpoint subsumption.
+        // In CLAIM restore mode, the shared state of the initial checkpoints must be
+        // discarded as soon as it becomes unused - so highestRetainCheckpointID is not updated.
+        if (mode != RestoreMode.CLAIM) {

Review Comment:
   I think only LEGACY mode need to keep artifacts from the previous checkpoint here?
   
   1. CLAIM mode, as explained above, the initial checkpoint already belongs to the current job, hence has to be included within the lifecycle as well.
   2. In No-CLAIM mode, the previous checkpoint is copied or a full copy of the checkpoint is made as to the initial checkpoint. 
   
   These are the whole point why CLAIM and NO CLAIM modes are introduced and make a clear boundary of checkpoint ownership between different jobs.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java:
##########
@@ -53,4 +53,6 @@
     public InlineElement getDescription() {
         return text(description);
     }
+
+    public static final RestoreMode DEFAULT = NO_CLAIM;

Review Comment:
   Is there any place we can get the default config directly instead of hard-coding here?
   
   It will easily miss this if later we want to make changes for default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org