You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 03:11:35 UTC

[GitHub] [flink] fredia commented on a diff in pull request #20313: [FLINK-28597][state] Discard initial checkpoints without a delay in common cases

fredia commented on code in PR #20313:
URL: https://github.com/apache/flink/pull/20313#discussion_r925114799


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -162,7 +174,14 @@ public Set<Long> unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
-                } else {
+                } else if (preventsDiscardingCreatedCheckpoint(entry)) {

Review Comment:
   `lowestCheckpointID`  is the lowest checkpoint in use,  we should add `lowestCheckpointID` into `checkpointInUse` set **in theory**.  And it would not violate correctness now, because `CheckpointsCleaner#cleanSubsumedCheckpoints` would check `<upTo` again.
   
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());
+    }
+
+    @Test
+    public void testUnregisterInitialCheckpoint() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));
+
         sharedStateRegistry.registerReference(
-                new SharedStateRegistryKey("second"), new TestingStreamStateHandle(), 4L);
-        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(3);
-        Set<Long> expectedInUse = new HashSet<>(Arrays.asList(1L, 4L));
-        assertEquals(expectedInUse, stillInUse);
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+
+        assertTrue(
+                "(retained) checkpoint - should NOT be considered in use even if its state is in use",
+                sharedStateRegistry.unregisterUnusedState(2).isEmpty());
+    }
+
+    /** Emulate turning changelog on while recovering from a retained checkpoint. */
+    @Test
+    public void testUnregisterInitialCheckpointUsedInChangelog() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        TestingStreamStateHandle handle = new TestingStreamStateHandle();
+
+        // "normal" restored checkpoint
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forCheckpoint(RETAIN_ON_CANCELLATION));

Review Comment:
   How to assign this `CheckpointProperties` to the restored checkpoint of changelog?
   
   We can‘t distinguish whether one checkpoint is restored or not in`ChangelogStateBackendHandle#registerSharedStates()`.
   
   Maybe we can do this in `CheckpointCoordinator(JM)`, but this method make JM know changelog.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java:
##########
@@ -182,20 +194,113 @@ public void testRegisterChangelogStateBackendHandles() throws InterruptedExcepti
     }
 
     @Test
-    public void testUnregisterUnusedState() {
+    public void testUnregisterUnusedSavepointState() {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 1L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 2L);
-        sharedStateRegistry.registerReference(new SharedStateRegistryKey("first"), handle, 3L);
+
+        registerInitialCheckpoint(
+                sharedStateRegistry,
+                RESTORED_STATE_ID,
+                CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
+
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 2L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey(RESTORED_STATE_ID), handle, 3L);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
+
+        assertEquals(
+                "Only the initial checkpoint should be retained because its state is in use",
+                singleton(1L),
+                sharedStateRegistry.unregisterUnusedState(3));
+        assertTrue(
+                "The initial checkpoint state is unused so it could be discarded",
+                sharedStateRegistry.unregisterUnusedState(4).isEmpty());

Review Comment:
   If "new-state" of  checkpoint-4 is still in use by subsequent checkpoints, checkpoint-4 also should be discarded, right?
   
   How about adding this check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org