You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/19 16:26:31 UTC

[GitHub] [flink] rkhachatryan opened a new pull request, #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

rkhachatryan opened a new pull request, #20313:
URL: https://github.com/apache/flink/pull/20313

   ## What is the purpose of the change
   
   This PR limits checkpoint discard delay in CLAIM restore mode to two cases:
   1. Changelog (off -> on). Adjust `SharedStateRegsitry` API to make it explicit that the state to register must prevent deletion of the checkpoint created it (i.e. initial checkpoint).
   2. RocksDB Incr. Native Savepoint. Rely on `SharingFilesStrategy` of the restored savepoint to determine whether any shared state can reside inside the `chk-xx` folder (rather than `/shared`).
   
   ## Verifying this change
   
   New tests in `SharedStateRegistryTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r927001242


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());

Review Comment:
   Right, added `testUnregisterNonInitialCheckpoint` test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #20313:
URL: https://github.com/apache/flink/pull/20313#issuecomment-1195202098

   Except for [`testUnregisterNonInitialCheckpoint`](https://github.com/apache/flink/pull/20313#discussion_r927001242), this PR looks good to me. 
   
   @curcur would you like to take a look as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934545515


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(

Review Comment:
   I'm not sure I fully understand your comment, but shared state is discarded either when it's not used, or a **different** entry is submitted for the same key (that's probably what you meant). 
   There is no contract to use placeholder; state backend might also send the same state handle.
   If a different non-placeholder entry is submitted then it depends on whether the existing entry is already a part of a completed checkpoint. If it is, then the new entry will be discarded; otherwise, the existing entry will be discarded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #20313:
URL: https://github.com/apache/flink/pull/20313#issuecomment-1201962209

   Thanks a lot for updating the PR @rkhachatryan, the changes LGTM, could you please squash the commits?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r927394019


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());
+    }
+
+    @Test
+    public void testUnregisterInitialCheckpoint() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));
+
         sharedStateRegistry.registerReference(
-                new SharedStateRegistryKey("second"), new TestingStreamStateHandle(), 4L);
-        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(3);
-        Set<Long> expectedInUse = new HashSet<>(Arrays.asList(1L, 4L));
-        assertEquals(expectedInUse, stillInUse);
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+
+        assertTrue(
+                "(retained) checkpoint - should NOT be considered in use even if its state is in use",
+                sharedStateRegistry.unregisterUnusedState(2).isEmpty());
+    }
+
+    /** Emulate turning changelog on while recovering from a retained checkpoint. */
+    @Test
+    public void testUnregisterInitialCheckpointUsedInChangelog() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        // "normal" restored checkpoint
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));

Review Comment:
   `CheckpointProperties` are saved (in this PR but not in `master`) into `CheckpointMetadata` and then restored and passed as a constructor argument to `CompletedCheckpoint` (that's the reason of adding `MetadataV4Serializer`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934176373


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(

Review Comment:
   I see. I just think about this case.
   The checkpoint 2 and 3 should be placeholder in the real world if no failure occurs so that the restored state will not be discarded in NO_CLAIM mode, right ?
   In your case, it just be overided by new reference so that it will work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r925114799


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -162,7 +174,14 @@ public Set<Long> unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
-                } else {
+                } else if (preventsDiscardingCreatedCheckpoint(entry)) {

Review Comment:
   `lowestCheckpointID`  is the lowest checkpoint in use,  we should add `lowestCheckpointID` into `checkpointInUse` set **in theory**.  And it would not violate correctness now, because `CheckpointsCleaner#cleanSubsumedCheckpoints` would check `<upTo` again.
   
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());
+    }
+
+    @Test
+    public void testUnregisterInitialCheckpoint() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));
+
         sharedStateRegistry.registerReference(
-                new SharedStateRegistryKey("second"), new TestingStreamStateHandle(), 4L);
-        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(3);
-        Set<Long> expectedInUse = new HashSet<>(Arrays.asList(1L, 4L));
-        assertEquals(expectedInUse, stillInUse);
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+
+        assertTrue(
+                "(retained) checkpoint - should NOT be considered in use even if its state is in use",
+                sharedStateRegistry.unregisterUnusedState(2).isEmpty());
+    }
+
+    /** Emulate turning changelog on while recovering from a retained checkpoint. */
+    @Test
+    public void testUnregisterInitialCheckpointUsedInChangelog() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        // "normal" restored checkpoint
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));

Review Comment:
   How to assign this `CheckpointProperties` to the restored checkpoint of changelog?
   
   We can‘t distinguish whether one checkpoint is restored or not in`ChangelogStateBackendHandle#registerSharedStates()`.
   
   Maybe we can do this in `CheckpointCoordinator(JM)`, but this method make JM know changelog.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());

Review Comment:
   If "new-state" of  checkpoint-4 is still in use by subsequent checkpoints, checkpoint-4 also should be discarded, right?
   
   How about adding this check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r933089891


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement

Review Comment:
   Oh, I see. That was generated by the IDE in its format, so it understands that the warning about `if` should be suppressed
   (I'd like to have an explicit `if` branch here).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r933108302


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement
+        if (NO_SHARING == restoredCheckpointSharingStrategies.get(entry.createdByCheckpointID)) {

Review Comment:
   Yes, the behavior described in the comment for `NO_SHARING` is relevant only until recovery. 
   After the recovery, the state can be re-used.
   
   I don't see any issue with that though; preventing re-use after recovery would result in slower first checkpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r933083299


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement
+        if (NO_SHARING == restoredCheckpointSharingStrategies.get(entry.createdByCheckpointID)) {

Review Comment:
   IIUC, NO_SHARING means the cp will not be used by next cp, so it's no need to delete it lazily if `NO_SHARING` works as its comments, right ?
   Actually, I think there are an issue about NO_SHARING currently. While restoring from `NO_SHARING`, the `materializedSstFiles` should be empty, but currently not as you could see in `RocksDBKeyedStateBackendBuilder#build`.
   IIUC, After fixing this, it should be FORMARD not NO_SHARING, right ?
   Or I missed something ?
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r933099304


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(

Review Comment:
   The test checks the set of checkpoints that are not used anymore. 
   This set doesn't depend on the restore mode.
   
   Restore mode was added to `registerAllAfterRestored` earlier; it affects whether the unused state entry will be discarded or simply removed from the in-memory collection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r927248842


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());
+    }
+
+    @Test
+    public void testUnregisterInitialCheckpoint() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));
+
         sharedStateRegistry.registerReference(
-                new SharedStateRegistryKey("second"), new TestingStreamStateHandle(), 4L);
-        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(3);
-        Set<Long> expectedInUse = new HashSet<>(Arrays.asList(1L, 4L));
-        assertEquals(expectedInUse, stillInUse);
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+
+        assertTrue(
+                "(retained) checkpoint - should NOT be considered in use even if its state is in use",
+                sharedStateRegistry.unregisterUnusedState(2).isEmpty());
+    }
+
+    /** Emulate turning changelog on while recovering from a retained checkpoint. */
+    @Test
+    public void testUnregisterInitialCheckpointUsedInChangelog() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        // "normal" restored checkpoint
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));

Review Comment:
   Got it, thanks for your clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r927006830


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());
+    }
+
+    @Test
+    public void testUnregisterInitialCheckpoint() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));
+
         sharedStateRegistry.registerReference(
-                new SharedStateRegistryKey("second"), new TestingStreamStateHandle(), 4L);
-        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(3);
-        Set<Long> expectedInUse = new HashSet<>(Arrays.asList(1L, 4L));
-        assertEquals(expectedInUse, stillInUse);
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+
+        assertTrue(
+                "(retained) checkpoint - should NOT be considered in use even if its state is in use",
+                sharedStateRegistry.unregisterUnusedState(2).isEmpty());
+    }
+
+    /** Emulate turning changelog on while recovering from a retained checkpoint. */
+    @Test
+    public void testUnregisterInitialCheckpointUsedInChangelog() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        // "normal" restored checkpoint
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));

Review Comment:
   When restoring from non-changelog checkpoint but with changelog enabled, `CheckpointProperties` do not affect deletion.
   
   It is the `ChangelogStateBackendHandle` that calls `stateRegistry.registerReference()` with `preventDiscardingCreatedCheckpoint=true`.
   That flag instructs `SharedStateRegistry` to not discard the initial checkpoint as long as changelog uses it,
   
   `CheckpointProperties` are used for RocksDB Incremental Native Savepoint case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
curcur commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934351551


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,18 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused

Review Comment:
   add that more documentation here: currently, changelog statebackend sets this to true, and uses delayed deletion



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,18 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement

Review Comment:
   same here, add comments saying NO_SHARING indicates recover from a savepoints, and explain why delay deletion is needd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur commented on pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
curcur commented on PR #20313:
URL: https://github.com/apache/flink/pull/20313#issuecomment-1200974713

   Besides the comments above, I think one thing that bothers me a bit is the new Checkpoint Meta serializer.
   
   If no_sharing is documented as it is, it should be fine just deleting it? No matter shared part or private part.
   
   It would be great if we can get rid of all the changes of meta data + completed Checkpoint data change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
curcur commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934296129


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -96,6 +97,9 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
     /** Properties for this checkpoint. */
     private final CheckpointProperties props;
 
+    /** Properties for this checkpoint. null for older versions. */

Review Comment:
   Please put a statement here how `restoredProps` is different from the above `props`
   
   It looks to me they are the same from the documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934714748


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -342,6 +342,10 @@ private void advanceLastUsingCheckpointID(long checkpointID) {
         }
 
         private void preventDiscardingCreatedCheckpoint() {
+            // Changed from false to true when a newer checkpoint starts reusing this state entry
+            // after recovery. This is to delay discarding the initial checkpoint until all of its

Review Comment:
   nit: This is to delay discarding the ~initial~ checkpoint until all of its state (both shared and private) is not used.
   
   The **initial** checkpoint is handled by [`restoredCheckpointSharingStrategies`](https://github.com/apache/flink/pull/20313/commits/34fa6de59b115f960e738b146db4b659287f4a8c#diff-87db13d9f5e3cf08a68052f9eba6c97ab22572fb5b5ccb0cabff7104efb21cebR412).
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -192,6 +211,13 @@ public void registerAll(
     @Override
     public void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode) {
         registerAll(checkpoint.getOperatorStates().values(), checkpoint.getCheckpointID());
+        checkpoint
+                .getRestoredProperties()

Review Comment:
   When off->on, if the "off" checkpoint is V3, the following branch will not be executed:
   ```Java
    if (entry.preventDiscardingCreatedCheckpoint
                   && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
               return true;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan merged pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan merged PR #20313:
URL: https://github.com/apache/flink/pull/20313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20313:
URL: https://github.com/apache/flink/pull/20313#issuecomment-1189312666

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "815fc355232f40adb363bf55031ffb942e7f8a78",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "815fc355232f40adb363bf55031ffb942e7f8a78",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 815fc355232f40adb363bf55031ffb942e7f8a78 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r927248842


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());
+    }
+
+    @Test
+    public void testUnregisterInitialCheckpoint() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));
+
         sharedStateRegistry.registerReference(
-                new SharedStateRegistryKey("second"), new TestingStreamStateHandle(), 4L);
-        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(3);
-        Set<Long> expectedInUse = new HashSet<>(Arrays.asList(1L, 4L));
-        assertEquals(expectedInUse, stillInUse);
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+
+        assertTrue(
+                "(retained) checkpoint - should NOT be considered in use even if its state is in use",
+                sharedStateRegistry.unregisterUnusedState(2).isEmpty());
+    }
+
+    /** Emulate turning changelog on while recovering from a retained checkpoint. */
+    @Test
+    public void testUnregisterInitialCheckpointUsedInChangelog() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        // "normal" restored checkpoint
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));

Review Comment:
   Got it, thanks for your clarification. I didn't see `CheckpointProperties` being assigned to RocksDB Incremental Native Savepoint case. Would you add it into this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r932892479


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(

Review Comment:
   I am a bit confused about the test.
   The default setting of restore mode in this test is NO_CLAIM mode.
   So the mock and the deletion shouldn't occur in the default mode?
   BTW, I think we may need to test CLAIM mode and NO_CLAIM mode both.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934552771


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement
+        if (NO_SHARING == restoredCheckpointSharingStrategies.get(entry.createdByCheckpointID)) {

Review Comment:
   > You mean that SharingFilesStrategy only applys on runtime but not on recovery ?
   
   Yes.
   
   > why we need to clarify NO_SHARING here, Do other strategies also need to contain these logics ?
   
   Testing for `NO_SHARING` here allows to delay discarding the checkpoint folder. Delay is necdesary if the checkpoint that was created with `NO_SHARING` because it implies "bundling" all files into the folder, including those potentially re-used after recovery.
   Other strategies don't imply that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934753700


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -192,6 +211,13 @@ public void registerAll(
     @Override
     public void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode) {
         registerAll(checkpoint.getOperatorStates().values(), checkpoint.getCheckpointID());
+        checkpoint
+                .getRestoredProperties()

Review Comment:
   Good catch! I've changed the type of map values to `Optional`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r929700808


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement

Review Comment:
   A typo?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r926994558


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -162,7 +174,14 @@ public Set<Long> unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
-                } else {
+                } else if (preventsDiscardingCreatedCheckpoint(entry)) {

Review Comment:
   `lowestCheckpointID` is still valid because it is not subsumed. It is `CheckpointSubsumeHelper` and checkpoint store that are responsible for subsumption.
    
   This method, however, returns potentially subsumed checkpoints but whose state might be in use by other checkpoints.
   I don't think these responsibilites should be mixed.
   
   Besides that, this seems unrelated to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r931118444


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement

Review Comment:
   Hmm...I don't see it. What exactly do you mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r931732557


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement

Review Comment:
   I'm confused about `noinspection RedundantIfStatement`, I guess you mean “don't check RedundantIfStatement”?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r934182431


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -365,4 +401,19 @@ public String toString() {
             return "EmptyDiscardStateObject{" + stateHandleID + '}';
         }
     }
+
+    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
+        // explicitly set by the backend, e.g. private state is reused
+        if (entry.preventDiscardingCreatedCheckpoint
+                && restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
+            return true;
+        }
+        // with NO_SHARING strategy, shared state, if any, is bundled inside the checkpoint folder
+        // noinspection RedundantIfStatement
+        if (NO_SHARING == restoredCheckpointSharingStrategies.get(entry.createdByCheckpointID)) {

Review Comment:
   Okay, Thanks for the clarification.
   You mean that `SharingFilesStrategy` only applys on runtime but not on recovery ?
   If so, why we need to clarify `NO_SHARING` here,  Do other strategies also need to contain these logics ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20313:
URL: https://github.com/apache/flink/pull/20313#issuecomment-1203726805

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org