You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/13 02:42:20 UTC

[GitHub] [flink] fredia opened a new pull request, #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

fredia opened a new pull request, #19448:
URL: https://github.com/apache/flink/pull/19448

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *Nowadays, recovering from non-changelog checkpoints is disallowed. Because when restoring from a non-changelog checkpoint with changelog state-backend enabled in CLAIM mode, the restored checkpoint may be discarded on subsuming. And the subsumed checkpoint might cause materialized artifacts to be dropped. This PR wants to support restore from non-changelog checkpoint with changelog enabled in CLAIM mode.*
   
   
   ## Brief change log
   
     - *Revert https://github.com/apache/flink/pull/18735*
     - *Cast `keyedStateHandle` to `ChangelogStateBackendHandle` on JM when changelog is enabled*
     
   
   
   ## Verifying this change
   
   This change added one test and can be verified as follows:
     - *`ChangelogRestoreITCase#testSwitchFromDisablingToEnablingInClaimMode()`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes, recovery on JM)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r853979792


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   I'm not sure whether it's always possible to infer whether the changelog is enabled or not from `executionGraphToRestore.getStateBackendName()`.
   In particular, will this work if `flink-conf.yaml` contains `state.backend.changelog.enabled=true` and job doesn't specify changelog or even any backend?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874957754


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,
+                // 2. update the status of the checkpoint in sharedStateRegistry to which the state
+                // handle belongs
+                sharedStateRegistry.registerReference(
+                        new SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()),
+                        new StreamStateHandleWrapper(stateHandle),

Review Comment:
   > can we get rid of the same call in ChangelogStateBackendHandle?
   
   We can't, the key of changelog handle and the key of the KeyedStateHandle in changelog handle is not same.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r901867846


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);

Review Comment:
   I see, thanks for clarifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan merged pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan merged PR #19448:
URL: https://github.com/apache/flink/pull/19448


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1163264334

   I've compared performance results. The top regressions are:
   ```
   org.apache.flink.benchmark.MultipleInputBenchmark.multiInputOneIdleMapSink	15.28%
   org.apache.flink.benchmark.InputBenchmark.mapRebalanceMapSink	11.73%
   org.apache.flink.benchmark.MultiInputCheckpointingTimeBenchmark.checkpointMultiInput	7.01%
   org.apache.flink.benchmark.WindowBenchmarks.globalWindow	5.72%
   org.apache.flink.benchmark.MemoryStateBackendBenchmark.stateBackends	3.84%
   org.apache.flink.benchmark.SortingBoundedInputBenchmarks.sortedMultiInput	3.59%
   org.apache.flink.benchmark.MemoryStateBackendBenchmark.stateBackends	3.58%
   org.apache.flink.benchmark.TwoInputBenchmark.twoInputMapSink	3.42%
   org.apache.flink.benchmark.WindowBenchmarks.tumblingWindow	3.29%
   org.apache.flink.benchmark.AsyncWaitOperatorBenchmark.asyncWait	2.42%
   ```
   
   Those are either caused by noisy benchmarks (e.g. `multiInputOneIdleMapSink`) and lie inside the usual bounds; or - probably - by a recent regression.
   
   So I'd suggest to wait for the results of the investigation of the latter, and then re-do the benchmark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r882360519


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +165,40 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        // only used for some tests
+        unregisterState(lowestCheckpointID, (x) -> {});
+    }
+
+    @Override
+    public void unregisterUnusedStateAndCheckpoint(
+            long lowestCheckpointID,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanUp,
+            @Nullable Executor executor) {
+        Set<Long> checkpointInUse = new HashSet<>();
+        unregisterState(lowestCheckpointID, id -> checkpointInUse.add(id));
+        if (executor != null) {
+            checkpointsCleaner.cleanSubsumedCheckpoints(
+                    lowestCheckpointID, checkpointInUse, postCleanUp, executor);
+        } else {
+            checkpointsCleaner.cleanSubsumedCheckpoints(
+                    lowestCheckpointID, checkpointInUse, postCleanUp, asyncDisposalExecutor);

Review Comment:
   That makes sense to me, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874950094


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java:
##########
@@ -137,23 +134,19 @@ public CompletedCheckpoint addCheckpointAndSubsumeOldestOne(
 
         completedCheckpoints.addLast(checkpoint);
 
+        // Register checkpoint to SharedStateRegistry, and checkpoint would be discarded during
+        // unregistering.
+        getSharedStateRegistry().registerCompletedCheckpoint(checkpoint);
+
+        // Remove completed checkpoint from queue and checkpointStateHandleStore, not discard.
         Optional<CompletedCheckpoint> subsume =
                 CheckpointSubsumeHelper.subsume(
                         completedCheckpoints,
                         maxNumberOfCheckpointsToRetain,
-                        completedCheckpoint ->
-                                tryRemoveCompletedCheckpoint(
-                                        completedCheckpoint,
-                                        completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                                        checkpointsCleaner,
-                                        postCleanup));
+                        completedCheckpoint -> tryRemove(completedCheckpoint.getCheckpointID()));

Review Comment:
   Yes, it is a duplicate call to cleanup checkpoint.  I pass an empty subsume action to `unregisterUnusedState` to avoid duplicate cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r854122607


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   Thanks for verifying. Indeed, the whole config is currently available to the JM.
   
   However, I'm not sure whether we should rely on it because:
   1. TMs can still have different configuration (I'm not sure whether it's a theoretical situation or not).
   2. Configuration is called `jobMasterConfiguration`; which means it doesn't have to be the whole configuration.
   
   So far we tried to not rely on it IIRC.
   
   We probably don't need to make this decision now, before deciding which approach to take.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874963782


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -47,23 +50,33 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
     /** All registered state objects by an artificial key */
     private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates;
 
+    /** All registered checkpoints */
+    private final Set<CompletedCheckpoint> registeredCheckpoints;

Review Comment:
   I replace `Set<CompletedCheckpoint>` with `Map<Long, CompletedCheckpoint>` temporarily. 
   
   > Do you think it could/should be refactored so that CompletedCheckpointStore "transfers the ownership" of subsumed checkpoints to SharedStateRegistry?
   > E.g. by passing them to unregisterUnusedState.
   
   I think it would work well, I would try to pass `DefaultCompletedCheckpointStore.completedCheckpoints` to unregisterUnusedState later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r876082210


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,

Review Comment:
   You are right, I added the ID to the wrapper class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r854060011


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   > will this work if flink-conf.yaml contains state.backend.changelog.enabled=true and job doesn't specify changelog or even any backend?
   
   Yes, it works from the test results.
    I set `state.backend.changelog.enabled: true`in flink-conf.yaml and run examples/SocketWindowWordCount.jar, here is JM log, we can see that changelog is enabled.
   ```
   2022-04-20 18:52:46,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job 'Socket Window WordCount' (23bf9f9518357ade7bec63df910fae98).
   2022-04-20 18:52:46,585 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for Socket Window WordCount (23bf9f9518357ade7bec63df910fae98).
   2022-04-20 18:52:46,595 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job Socket Window WordCount (23bf9f9518357ade7bec63df910fae98).
   2022-04-20 18:52:46,595 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
   2022-04-20 18:52:46,600 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
   2022-04-20 18:52:46,600 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@37e3b93e
   2022-04-20 18:52:46,612 INFO  org.apache.flink.state.changelog.ChangelogStateBackend       [] - ChangelogStateBackend is used, delegating HashMapStateBackend.
   ```
   Meanwhile, we can see that the `StateBackendName` is set in `DefaultExecutionGraph.enableCheckpointing()`, and `enableCheckpoint()` is called by `DefaultExecutionGraphBuilder.buildGraph()`  in https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java#L307
   
   So, before we call `executionGraphToRestore.getStateBackendName()`, the  StateBackendName has been set. https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L149
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r857419632


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   @rkhachatryan , `StateBackendLoader` is called both on JM and TM side. I think we don't need to worry about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r881847122


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,13 +63,30 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID);
+
     /**
      * Unregister state that is not referenced by the given checkpoint ID or any newer.
      *
      * @param lowestCheckpointID which is still valid
      */
+    @VisibleForTesting
     void unregisterUnusedState(long lowestCheckpointID);

Review Comment:
   Delete it. It is used by `AbstractCompleteCheckpointStore#unregisterUnusedState()`, and  `AbstractCompleteCheckpointStore#unregisterUnusedState()` is used by `EmbeddedCompletedCheckpointStore#removeOldestCheckpoint()`, which is used for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r902552365


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointsCleanerTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.TestCompletedCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryImpl;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** {@link CheckpointsCleaner} test. */
+public class CheckpointsCleanerTest {
+
+    @Test
+    public void testCleanSubsumedCheckpointNormal() {

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r902272757


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointsCleanerTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.TestCompletedCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryImpl;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** {@link CheckpointsCleaner} test. */
+public class CheckpointsCleanerTest {
+
+    @Test
+    public void testCleanSubsumedCheckpointNormal() {

Review Comment:
   Done, split it into `testUnregisterUnusedState`,  `testNotCleanCheckpointInUse`, `testNotCleanHigherCheckpoint`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r895299786


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);

Review Comment:
   It is not needed, delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r853979792


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   I'm not sure whether it's always possible to infer whether the changelog is enabled or not from `executionGraphToRestore.getStateBackendName()`.
   In particular, will this work if `flink-conf.yaml` contains `state.backend.changelog.enabled=true`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r857491738


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   Though we cannot guarantee that the `flink-conf` same both on JM and TM side, however, this usually does not happen in production environment. What's more, unmatched configurations would also bring other problems except this.
   I think we don't need to focus on this cocern case, we should focus on the general prons and cons of which each solution would make.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r857507765


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   I totally agree, that's what I mean above by "We probably don't need to make this decision now, before deciding which approach to take."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r849248992


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -211,13 +220,87 @@ public long getStateSize() {
      * @param restoreMode the mode in which this checkpoint was restored from
      */
     public void registerSharedStatesAfterRestored(
-            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+            SharedStateRegistry sharedStateRegistry,
+            RestoreMode restoreMode,
+            boolean changelogEnabled) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
+            if (changelogEnabled) {
+                for (OperatorState operatorState : operatorStates.values()) {
+                    for (Map.Entry<Integer, OperatorSubtaskState> entry :
+                            operatorState.getSubtaskStates().entrySet()) {
+                        List<KeyedStateHandle> changelogStateBackendHandles =
+                                entry.getValue().getManagedKeyedState().stream()
+                                        .map(x -> getChangelogStateBackendHandle(x))
+                                        .collect(Collectors.toList());
+                        StateObjectCollection<KeyedStateHandle> stateHandles =
+                                new StateObjectCollection<>(changelogStateBackendHandles);
+                        operatorState.putState(
+                                entry.getKey(),
+                                entry.getValue()
+                                        .toBuilder()
+                                        .setManagedKeyedState(stateHandles)
+                                        .build());

Review Comment:
   I have several concerns about placing all this logic here in `CompletedCheckpoint`:
   1. `CompletedCheckpoint` is intended to be immutable; here, it's contents is changed after potentially being used
   1. The programmer can make a mistake by registering this checkpoint with the registry usnig some other method (and not rebuilding handles)
   1. `CompletedCheckpoint` is made aware of Changelog and some details, like `ChangelogStateBackendHandleImpl`
   
   How about moving all this code of rebuilding the checkpoint closer to where it's loaded? I.e. `Checkpoints.loadAndValidateCheckpoint` and `DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoint`?
   
   Additionally for (3), I think we can not avoid **some** JM code being aware of changelog (unless we generalize this somehow, which is probably too early for now). 
   But maybe this code can be put in a single changelog-related class, e.g. existing `ChangelogStateBackendHandle` or some new utility class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1153441438

   Thank @rkhachatryan for the review.
   
   > I also see that KeyedStateCheckpointingITCase.testWithMemoryBackendSync timed out on azure - could you please check that it's not related?
   
   I ran `KeyedStateCheckpointingITCase.testWithMemoryBackendSync` 10 times locally and it was normal, looks like it's not related.
   
   > Besides that, when running locally,
   ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode times out - could you please check it too?
   
   `ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode` does `checkpoint` and `restore` multiple times, which will take a long time. I deleted `sleep` in it and ran it again, it was normal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r894476700


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -50,6 +56,9 @@ public class CheckpointsCleaner implements Serializable, AutoCloseableAsync {
     @Nullable
     private CompletableFuture<Void> cleanUpFuture;
 
+    /** All subsumed checkpoints. */
+    private final Map<Long, CompletedCheckpoint> subsumedCheckpoints = new HashMap<>();

Review Comment:
   Looks like this field can be a `List` now?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {

Review Comment:
   Could you please add javadoc to this method?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,12 +59,13 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID);
+
     /**
      * Unregister state that is not referenced by the given checkpoint ID or any newer.
      *
      * @param lowestCheckpointID which is still valid
      */
-    void unregisterUnusedState(long lowestCheckpointID);
+    Set<Long> unregisterUnusedState(long lowestCheckpointID);

Review Comment:
   Could you please update javadoc and describe what does this method returns now?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);
+        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+    }
+
+    @Test
+    public void testCheckpointFolderDeletion() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // cancel after next materialization
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 100);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        Thread.sleep(1000);

Review Comment:
   Could you please explain why this `sleep` is needed?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);

Review Comment:
   This test relies on materialization, but the interval is quite big and might prevent it from happening - should it be decreased?
   
   And ideally, we should verify the presense of the materialized part - similar to `ChangelogPeriodicMaterializationITCase`.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);

Review Comment:
   Could you please explain why this `sleep` is needed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) {
+                    try {
+                        LOG.debug("Try to discard checkpoint {}.", checkpoint.getCheckpointID());
+                        cleanCheckpoint(
+                                checkpoint,
+                                checkpoint.shouldBeDiscardedOnSubsume(),
+                                postCleanAction,
+                                executor);
+                        iterator.remove();
+                    } catch (Exception e) {
+                        LOG.warn("Fail to discard the old checkpoint {}.", checkpoint);
+                    }
+                }
+            }
+        }
+    }
+
+    @VisibleForTesting
+    void cleanSubsumedCheckpointsWithException(
+            long upTo,
+            Set<Long> stillInUse,
+            Runnable postCleanAction,
+            Executor executor,
+            DiscardException discardException) {

Review Comment:
   IIUC, this method is only used in test; and that test verifies this method behavior. This doesn't make much sense to me, maybe we can just remove both?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);
+        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+    }
+
+    @Test
+    public void testCheckpointFolderDeletion() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // cancel after next materialization
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 100);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        Thread.sleep(1000);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        assertFalse(checkpointFolderExists(firstRestorePath.substring(5)));

Review Comment:
   1. Ideally, folder deletion should be tested also in case of checkpoint subsumption.
   2. Is it guaranteed that the folder is cleaned up by the time `cancelJob.get` returns? If not, the assertion might be flaky



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);

Review Comment:
   There's no guarantee that private state is still needed - it could be that the delegated state backend materialized with completely new private state, right? (just asking)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) {
+                    try {
+                        LOG.debug("Try to discard checkpoint {}.", checkpoint.getCheckpointID());
+                        cleanCheckpoint(

Review Comment:
   I am concerned about `synchronized (subsumedCheckpoints)`:
   `cleanCheckpoint` eventually calls `incrementNumberOfCheckpointsToClean` which is synchronized on `this`. Synchronizing on a single object would make reasoning easier and less error-prone, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r882366289


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +72,36 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {

Review Comment:
   👍, added CheckpointsCleanerTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r880787079


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,13 +58,30 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID);
+
+    /**
+     * Register a reference to the given checkpoint in the registry.
+     *
+     * @param checkpoint which is completed
+     */
+    void registerCompletedCheckpoint(CompletedCheckpoint checkpoint);

Review Comment:
   Wouldn't it be more clear to pass the checkpoint to `unregisterUnusedStateAndCheckpoint` (and remove this method)?
   
   That would require changing `Optional<Checkpoint>` to `List` in 
   `StandaloneCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne`, but that shouldn't be an issue.
   
   edit:
   the methods make sense if we introduce `SubsumedCheckpointStore`/Cleaner as discussed offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1161723316

   Thanks again for updating the PR @fredia ,
   The changes LGTM, could you please squash the commits?
   
   Meanwhile, I've [submitted](http://codespeed.dak8s.net:8080/job/flink-benchmark-request/148/) the PR for benchmarking to see if there are any potential regressions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r873863463


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -47,23 +50,33 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
     /** All registered state objects by an artificial key */
     private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates;
 
+    /** All registered checkpoints */
+    private final Set<CompletedCheckpoint> registeredCheckpoints;

Review Comment:
   I realized that this field is now (partially) duplicating `DefaultCompletedCheckpointStore.completedCheckpoints`.
   
   Do you think it could/should be refactored so that `CompletedCheckpointStore` "transfers the ownership" of subsumed checkpoints to `SharedStateRegistry`? 
   E.g. by passing them to `unregisterUnusedState`.
   
   Another issue: `CompletedCheckpoint` doesn't have `equals/hashCode` overriden, so it's better to use `Map<Long, CompletedCheckpoint>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r875289701


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,

Review Comment:
   The ID is formally necessary to implement `equals` (`hashCode`). Otherwise, each time this ID is used, `SharedStateRegistry` will try to discard it.
   Discard is a no-op, but currently deletions are scheduled one-by-one to the IO thread pool. So it might delay other IO tasks and/or lengthen the queue inside the pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r873863463


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -47,23 +50,33 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
     /** All registered state objects by an artificial key */
     private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates;
 
+    /** All registered checkpoints */
+    private final Set<CompletedCheckpoint> registeredCheckpoints;

Review Comment:
   I realized that this field is now (partially) duplicating the `DefaultCompletedCheckpointStore.completedCheckpoints`.
   
   Do you think it could/should be refactored so that `CompletedCheckpointStore` "transfers the ownership" of subsumed checkpoints to `SharedStateRegistry`? E.g. by passing them to `unregisterUnusedState`.
   
   Another issue: `CompletedCheckpoint` doesn't have `equals/hashCode` overriden, so it's better to use `Map<Long, CompletedCheckpoint>`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,

Review Comment:
   I think this can be avoided because checkpoint will be discarded once no state is in use, which will also discard the private state.
   And it's better to keep discarding there (in `CheckpointCleaner`), rather than in `SharedStateRegistry` because the back-pressure mechanism is better there.
   
   So I propose to have an empty `discardState` method in wrapper.
   (That would also allow to replace `keyedStateHandle` in it with only an ID)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -137,12 +150,26 @@ public StreamStateHandle registerReference(
         return entry.stateHandle;
     }
 
+    @Override
+    public void registerCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+        synchronized (registeredCheckpoints) {
+            LOG.trace("Register checkpoint {}.", checkpoint.getCheckpointID());
+            registeredCheckpoints.add(checkpoint);
+        }
+    }
+
+    @Override
+    public void setPostCleanAction(Runnable postCleanAction) {
+        this.postCleanAction = postCleanAction;

Review Comment:
   I'd prefer this action to be passed as an argument to `unregisterUnusedState` rather than having a mutable field, because
   1. on shutdown, it should not be called, right? with an argument, it's easier to pass a no-op callback
   2. it's easy to forget to call this setter and more difficult to understand **when** is it called



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java:
##########
@@ -137,23 +134,19 @@ public CompletedCheckpoint addCheckpointAndSubsumeOldestOne(
 
         completedCheckpoints.addLast(checkpoint);
 
+        // Register checkpoint to SharedStateRegistry, and checkpoint would be discarded during
+        // unregistering.
+        getSharedStateRegistry().registerCompletedCheckpoint(checkpoint);
+
+        // Remove completed checkpoint from queue and checkpointStateHandleStore, not discard.
         Optional<CompletedCheckpoint> subsume =
                 CheckpointSubsumeHelper.subsume(
                         completedCheckpoints,
                         maxNumberOfCheckpointsToRetain,
-                        completedCheckpoint ->
-                                tryRemoveCompletedCheckpoint(
-                                        completedCheckpoint,
-                                        completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                                        checkpointsCleaner,
-                                        postCleanup));
+                        completedCheckpoint -> tryRemove(completedCheckpoint.getCheckpointID()));

Review Comment:
   In `shutdown`, the store actually calls `checkpointCleaner.cleanCheckpoint`, as opposed to adding a checkpoint (where it only removes it from ZK).
   
   After that `shutdown` calls `unregisterUnusedState` (line 200).
   I wonder if there will be a duplicate call to cleanup a checkpoint.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,
+                // 2. update the status of the checkpoint in sharedStateRegistry to which the state
+                // handle belongs
+                sharedStateRegistry.registerReference(
+                        new SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()),
+                        new StreamStateHandleWrapper(stateHandle),

Review Comment:
   As per the comment above, I think there should be a wrapper to have an empty `discardState`. So the interface may stay the same (FLINK-25862 not needed here IMO).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,
+                // 2. update the status of the checkpoint in sharedStateRegistry to which the state
+                // handle belongs
+                sharedStateRegistry.registerReference(
+                        new SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()),
+                        new StreamStateHandleWrapper(stateHandle),

Review Comment:
   Another question: can we get rid of the same call in `ChangelogStateBackendHandle`? 
   In fact, they will try to use the same key, won't they?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,14 +183,38 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    checkpointInUse.add(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
+
+        // One checkpoint should be discarded after all its private states have been discarded.
+        synchronized (registeredCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = registeredCheckpoints.iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < lowestCheckpointID
+                        && !checkpointInUse.contains(checkpoint.getCheckpointID())) {
+                    iterator.remove();
+                    LOG.trace("Discard checkpoint {}.", checkpoint.getCheckpointID());
+
+                    try {
+                        checkpointsCleaner.cleanCheckpoint(
+                                checkpoint,
+                                checkpoint.shouldBeDiscardedOnSubsume(),
+                                postCleanAction,
+                                asyncDisposalExecutor);
+                    } catch (Exception e) {
+                        LOG.warn("Fail to discard the old checkpoint.", checkpoint);

Review Comment:
   `{}` is missing in the message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r901886009


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +77,50 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    /**
+     * Add one subsumed checkpoint to CheckpointsCleaner, the subsumed checkpoint would be discarded
+     * at {@link #cleanSubsumedCheckpoints(long, Set, Runnable, Executor)}.
+     *
+     * @param completedCheckpoint which is subsumed.
+     */
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (lock) {
+            subsumedCheckpoints.add(completedCheckpoint);
+        }
+    }
+
+    /**
+     * Clean checkpoint that is not in the given {@param stillInUse}.
+     *
+     * @param upTo lowest CheckpointID which is still valid.
+     * @param stillInUse that the state of it is still being referenced.
+     * @param postCleanAction
+     * @param executor

Review Comment:
   Remove `@param' or describe them?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointsCleanerTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.TestCompletedCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryImpl;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** {@link CheckpointsCleaner} test. */
+public class CheckpointsCleanerTest {
+
+    @Test
+    public void testCleanSubsumedCheckpointNormal() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
+        TestCompletedCheckpoint cp1 = createCheckpoint(1, sharedStateRegistry);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("test_cp_in_use"), new TestingStreamStateHandle(), 1L);
+        checkpointsCleaner.addSubsumedCheckpoint(cp1);
+        TestCompletedCheckpoint cp2 = createCheckpoint(2, sharedStateRegistry);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("test_cp_in_use"), new TestingStreamStateHandle(), 2L);
+        checkpointsCleaner.addSubsumedCheckpoint(cp2);
+        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(2);
+        Set<Long> expectedInUse = new HashSet<>();
+        expectedInUse.add(1L);
+        assertEquals(expectedInUse, stillInUse);
+        checkpointsCleaner.cleanSubsumedCheckpoints(
+                3, stillInUse, () -> {}, Executors.directExecutor());
+        assertFalse(cp1.isDiscarded());
+        assertTrue(cp2.isDiscarded());
+        TestCompletedCheckpoint cp3 = createCheckpoint(3, sharedStateRegistry);
+        TestCompletedCheckpoint cp4 = createCheckpoint(4, sharedStateRegistry);
+        checkpointsCleaner.addSubsumedCheckpoint(cp3);
+        checkpointsCleaner.addSubsumedCheckpoint(cp4);
+        checkpointsCleaner.cleanSubsumedCheckpoints(
+                4, Collections.EMPTY_SET, () -> {}, Executors.directExecutor());

Review Comment:
   nit: `Collections.emptySet()`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointsCleanerTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.TestCompletedCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryImpl;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** {@link CheckpointsCleaner} test. */
+public class CheckpointsCleanerTest {
+
+    @Test
+    public void testCleanSubsumedCheckpointNormal() {

Review Comment:
   I'd split this test into several smaller tests:
   1. The set returned from `sharedStateRegistry.unregisterUnusedState` - this should be in `SharedStateRegistryTest`
   2. Not discarding checkpoints higher than `upTo` (and discarding lower than or equal to)
   3. Not discarding checkpoints still in use (and discarding not in use)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -43,13 +49,19 @@ public class CheckpointsCleaner implements Serializable, AutoCloseableAsync {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class);
     private static final long serialVersionUID = 2545865801947537790L;
 
-    @GuardedBy("this")
+    private final Object lock = new Object();

Review Comment:
   There are now sections synchronized on this field and on `this`.
   Should it be a single object (either `this` or `lock`)?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointsCleanerTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.TestCompletedCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryImpl;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** {@link CheckpointsCleaner} test. */
+public class CheckpointsCleanerTest {
+
+    @Test
+    public void testCleanSubsumedCheckpointNormal() {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
+        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
+        TestCompletedCheckpoint cp1 = createCheckpoint(1, sharedStateRegistry);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("test_cp_in_use"), new TestingStreamStateHandle(), 1L);
+        checkpointsCleaner.addSubsumedCheckpoint(cp1);
+        TestCompletedCheckpoint cp2 = createCheckpoint(2, sharedStateRegistry);
+        sharedStateRegistry.registerReference(
+                new SharedStateRegistryKey("test_cp_in_use"), new TestingStreamStateHandle(), 2L);
+        checkpointsCleaner.addSubsumedCheckpoint(cp2);
+        Set<Long> stillInUse = sharedStateRegistry.unregisterUnusedState(2);
+        Set<Long> expectedInUse = new HashSet<>();
+        expectedInUse.add(1L);

Review Comment:
   nit: `Collections.singleton(1L)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1161751361

   Thanks a lot for your detailed review @rkhachatryan. And I squashed and rebased the commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r853004206


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -211,13 +220,87 @@ public long getStateSize() {
      * @param restoreMode the mode in which this checkpoint was restored from
      */
     public void registerSharedStatesAfterRestored(
-            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+            SharedStateRegistry sharedStateRegistry,
+            RestoreMode restoreMode,
+            boolean changelogEnabled) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
+            if (changelogEnabled) {
+                for (OperatorState operatorState : operatorStates.values()) {
+                    for (Map.Entry<Integer, OperatorSubtaskState> entry :
+                            operatorState.getSubtaskStates().entrySet()) {
+                        List<KeyedStateHandle> changelogStateBackendHandles =
+                                entry.getValue().getManagedKeyedState().stream()
+                                        .map(x -> getChangelogStateBackendHandle(x))
+                                        .collect(Collectors.toList());
+                        StateObjectCollection<KeyedStateHandle> stateHandles =
+                                new StateObjectCollection<>(changelogStateBackendHandles);
+                        operatorState.putState(
+                                entry.getKey(),
+                                entry.getValue()
+                                        .toBuilder()
+                                        .setManagedKeyedState(stateHandles)
+                                        .build());

Review Comment:
   Thanks for your suggestion! 
   Keeping `CompletedCheckpoint` immutable is a better approach, so I moved the code of rebuilding checkpoint to `Checkpoints.loadAndValidateCheckpoint`, and put the cast logic in `ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl`. 
   
   > I think registering all KeyedStateHandles with the SharedStateRegistry on recovery in CLAIM mode would also solve the problem, wouldn't it?
   
   For this suggestion, I think it may not work as well,  because the `discardState()` of some KeyedStateHandles are **not empty**,  the state would be discarded on checkpoint subsuming.
   and I also left a comment under [FLINK-25872](https://issues.apache.org/jira/browse/FLINK-25872), maybe we can discuss in the ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1127369021

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r873872760


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -137,12 +150,26 @@ public StreamStateHandle registerReference(
         return entry.stateHandle;
     }
 
+    @Override
+    public void registerCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+        synchronized (registeredCheckpoints) {
+            LOG.trace("Register checkpoint {}.", checkpoint.getCheckpointID());
+            registeredCheckpoints.add(checkpoint);
+        }
+    }
+
+    @Override
+    public void setPostCleanAction(Runnable postCleanAction) {
+        this.postCleanAction = postCleanAction;

Review Comment:
   I'd prefer this action to be passed as an argument to `unregisterUnusedState` rather than having it as a mutable field, because:
   1. on shutdown, it should not be called, right? with an argument, it's easier to pass a no-op callback
   2. it's easy to forget to call this setter and more difficult to understand **when** is it called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r881340987


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +175,47 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        unregisterState(lowestCheckpointID, (x) -> {});

Review Comment:
   This method is only used to delete unused states and called by `shutdown()`,  all checkpoints are discarded by `CompletedCheckpointStore` when shutdown. So,  I prefer not to discard checkpoints again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1153669656

   @flinkbot run azure
   
   My local ci passed. https://dev.azure.com/fredia/flink/_build/results?buildId=342&view=results
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r895296077


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);

Review Comment:
   This is done by making **the interval of materialization**  very long to ensure that materialization does not happen, to make the old private state still in use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1134778763

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r881739752


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -50,6 +54,9 @@ public class CheckpointsCleaner implements Serializable, AutoCloseableAsync {
     @Nullable
     private CompletableFuture<Void> cleanUpFuture;
 
+    /** All subsumed checkpoints. */
+    private Map<Long, CompletedCheckpoint> subsumedCheckpoints = new HashMap<>();

Review Comment:
   IDE suggests that this field can be final.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,13 +63,30 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID);
+
     /**
      * Unregister state that is not referenced by the given checkpoint ID or any newer.
      *
      * @param lowestCheckpointID which is still valid
      */
+    @VisibleForTesting
     void unregisterUnusedState(long lowestCheckpointID);

Review Comment:
   The method is still used by production code, so the annotation is unnecessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +165,40 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        // only used for some tests
+        unregisterState(lowestCheckpointID, (x) -> {});
+    }
+
+    @Override
+    public void unregisterUnusedStateAndCheckpoint(
+            long lowestCheckpointID,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanUp,
+            @Nullable Executor executor) {
+        Set<Long> checkpointInUse = new HashSet<>();
+        unregisterState(lowestCheckpointID, id -> checkpointInUse.add(id));
+        if (executor != null) {
+            checkpointsCleaner.cleanSubsumedCheckpoints(
+                    lowestCheckpointID, checkpointInUse, postCleanUp, executor);
+        } else {
+            checkpointsCleaner.cleanSubsumedCheckpoints(
+                    lowestCheckpointID, checkpointInUse, postCleanUp, asyncDisposalExecutor);

Review Comment:
   WDYT about returning checkpointsInUse from this function (`unregisterUnusedState`) and calling `checkpointsCleaner.cleanSubsumedCheckpoints` outside (i.e. in `CompletedCheckpointStore`)?
   
   That way, `SharedStateRegistry` is kept  unaware of checkpoints cleanup.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -107,6 +144,7 @@ private void maybeCompleteCloseUnsafe() {
         if (numberOfCheckpointsToClean == 0 && cleanUpFuture != null) {
             cleanUpFuture.complete(null);
         }
+        subsumedCheckpoints.clear();

Review Comment:
   Shouldn't this statement be placed under `if`? 
   Otherwise, it will clear `subsumedCheckpoints` after every checkpoint cleanup?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +72,36 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {

Review Comment:
   I think it makes sense to cover this logic by some unit tests.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +165,40 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        // only used for some tests
+        unregisterState(lowestCheckpointID, (x) -> {});
+    }
+
+    @Override
+    public void unregisterUnusedStateAndCheckpoint(
+            long lowestCheckpointID,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanUp,
+            @Nullable Executor executor) {
+        Set<Long> checkpointInUse = new HashSet<>();
+        unregisterState(lowestCheckpointID, id -> checkpointInUse.add(id));

Review Comment:
   nit: `unregisterState(lowestCheckpointID, checkpointInUse::add);`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +72,36 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) {
+                    iterator.remove();
+                    LOG.trace("Discard checkpoint {}.", checkpoint.getCheckpointID());

Review Comment:
   `trace` -> `debug`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java:
##########
@@ -137,22 +137,21 @@ public CompletedCheckpoint addCheckpointAndSubsumeOldestOne(
 
         completedCheckpoints.addLast(checkpoint);
 
+        // Remove completed checkpoint from queue and checkpointStateHandleStore, not discard.
         Optional<CompletedCheckpoint> subsume =
                 CheckpointSubsumeHelper.subsume(
                         completedCheckpoints,
                         maxNumberOfCheckpointsToRetain,
-                        completedCheckpoint ->
-                                tryRemoveCompletedCheckpoint(
-                                        completedCheckpoint,
-                                        completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                                        checkpointsCleaner,
-                                        postCleanup));
-        unregisterUnusedState(completedCheckpoints);
-
-        if (subsume.isPresent()) {
-            LOG.debug("Added {} to {} without any older checkpoint to subsume.", checkpoint, path);
-        } else {
-            LOG.debug("Added {} to {} and subsume {}.", checkpoint, path, subsume);
+                        completedCheckpoint -> {
+                            tryRemove(completedCheckpoint.getCheckpointID());
+                            checkpointsCleaner.addSubsumedCheckpoint(completedCheckpoint);
+                        });
+
+        Optional<Long> lowestCheckpointId = findLowest(completedCheckpoints);
+        if (lowestCheckpointId.isPresent()) {

Review Comment:
   nit: `isPresent` -> `ifPresent`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +72,36 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) {
+                    iterator.remove();

Review Comment:
   Should we retry deletion in case of failure, by moving `iterator.remove();` under `catch`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -137,8 +146,8 @@ public StreamStateHandle registerReference(
         return entry.stateHandle;
     }
 
-    @Override
-    public void unregisterUnusedState(long lowestCheckpointID) {
+    private void unregisterState(
+            long lowestCheckpointID, Consumer<Long> markCheckpointInUseAction) {

Review Comment:
   I think it makes sense to cover the logic of computing `markCheckpointInUseAction` by some unit tests.
   
   It could be possible if `unregisterUnusedState` returns this set as suggested in another comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1171001821

   I re-run the benchmark of this PR and master, and filter out the item that regression>5%, here are top regressions from jenkins's results:
   
   <google-sheets-html-origin><!--td {border: 1px solid #ccc;}br {mso-data-placement:same-cell;}-->
   
   Benchmark | Unit |  [PR run#1](http://codespeed.dak8s.net:8080/job/flink-benchmark-request/159/) |   [PR run#2](http://codespeed.dak8s.net:8080/job/flink-benchmark-request/160/) |  [master run#1](http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-java8/448/) | [master run#2](http://codespeed.dak8s.net:8080/job/flink-benchmark-request/161/) | param
   -- | -- | -- | -- | -- | -- | --
   org.apache.flink.benchmark.BlockingPartitionBenchmark.uncompressedMmapPartition | ops/ms | 8666.328735 | 9042.724129 | 9446.251683 | 9157.253861 |  
   org.apache.flink.benchmark.InputBenchmark.mapRebalanceMapSink | ops/ms | 15071.44224 | 13998.4278 | 13563.52986 | 14826.58196 |  
   org.apache.flink.benchmark.MultipleInputBenchmark.multiInputOneIdleMapSink | ops/ms | 9849.316859 | 10965.11345 | 10850.4832 | 10897.66905 |  
   org.apache.flink.benchmark.RemoteChannelThroughputBenchmark.remoteRebalance | ops/ms | 16325.58916 | 16523.94144 | 16774.34937 | 17514.40819 | DEBLOAT
   org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor.networkThroughput | ops/ms | 65898.42896 | 61612.85227 | 68250.9443 | 68369.23892 | 100,100ms
   org.apache.flink.benchmark.TwoInputBenchmark.twoInputMapSink | ops/ms | 17380.14833 | 17777.86862 | 17506.91314 | 18379.76836 |  
   org.apache.flink.benchmark.WindowBenchmarks.globalWindow | ops/ms | 6066.933889 | 6326.320592 | 6621.917999 | 6370.900111 |  
   org.apache.flink.benchmark.WindowBenchmarks.slidingWindow | ops/ms | 660.021689 | 685.552758 | 701.218769 | 663.998936 |  
   
   
   
   and I also re-run `remoteRebalance`, `networkThroughput`,` globalWindow` on my local machine:
   
   `org.apache.flink.benchmark.RemoteChannelThroughputBenchmark.remoteRebalance`:
   <google-sheets-html-origin><!--td {border: 1px solid #ccc;}br {mso-data-placement:same-cell;}-->
     | run#1 | run#2 | run#3 | run#4
   -- | -- | -- | -- | --
   master | 14376.388 ± 916.003 | 14346.302 ± 1222.127 | 16307.239 ± 532.418 | 12082.863 ± 1558.623
   PR19448 | 14503.878 ± 1245.878 | 13375.936 ± 1423.362 | 13242.821 ± 1636.507 | 13585.857 ± 1020.836
   
   `org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor.networkThroughput`:
   <google-sheets-html-origin><!--td {border: 1px solid #ccc;}br {mso-data-placement:same-cell;}-->
   
     | run#1 | run#2 | run#3 | run#4
   -- | -- | -- | -- | --
   master | 60693.460 ± 3910.983 | 67485.945 ± 2368.999 | 67428.326 ± 2640.449 | 65570.558 ± 4839.778
   PR19448 | 62581.403 ± 3676.553 | 68797.921 ± 2080.682 | 59300.429 ± 9673.195 | 70301.194 ± 1727.879
   
   `org.apache.flink.benchmark.WindowBenchmarks.globalWindow`:
   
   <google-sheets-html-origin><!--td {border: 1px solid #ccc;}br {mso-data-placement:same-cell;}-->
   
     | run#1 | run#2 | run#3 | run#4
   -- | -- | -- | -- | --
   master | 5528.938 ± 1420.702 | 6685.339 ± 350.645 | 6678.014 ± 353.418 | 6374.245 ± 947.530
   PR19448 | 6536.733 ± 571.271 | 6789.017 ± 247.339 | 6836.893 ± 369.841 | 6973.793 ± 240.786
   
   According to the above data, this PR is **not always worse than** master,  so I think that the regressions are caused by noise.  @rkhachatryan WDYT?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1160699889

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r873845873


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,

Review Comment:
   I think deletion by the `SharedStateRegistry` can and should be avoided. 
   Checkpoint will be discarded once no state is in use, which will also discard the private state.
   And it's better to keep it this way (`CheckpointCleaner`, rather than  `SharedStateRegistry`) because the back-pressure mechanism there is better; and the purpose of registering here is to simply "mark" the **checkpoint** as used.
   
   So I propose to have an empty `discardState` method in the wrapper (that would also allow to replace `keyedStateHandle` in it with only an ID).
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874946254


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -137,12 +150,26 @@ public StreamStateHandle registerReference(
         return entry.stateHandle;
     }
 
+    @Override
+    public void registerCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+        synchronized (registeredCheckpoints) {
+            LOG.trace("Register checkpoint {}.", checkpoint.getCheckpointID());
+            registeredCheckpoints.add(checkpoint);
+        }
+    }
+
+    @Override
+    public void setPostCleanAction(Runnable postCleanAction) {
+        this.postCleanAction = postCleanAction;

Review Comment:
   Nice suggestion. I pass the `subsumAction` as an argument to unregisterUnusedState, and subsume action contains `postCleanAction`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r873881105


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java:
##########
@@ -137,23 +134,19 @@ public CompletedCheckpoint addCheckpointAndSubsumeOldestOne(
 
         completedCheckpoints.addLast(checkpoint);
 
+        // Register checkpoint to SharedStateRegistry, and checkpoint would be discarded during
+        // unregistering.
+        getSharedStateRegistry().registerCompletedCheckpoint(checkpoint);
+
+        // Remove completed checkpoint from queue and checkpointStateHandleStore, not discard.
         Optional<CompletedCheckpoint> subsume =
                 CheckpointSubsumeHelper.subsume(
                         completedCheckpoints,
                         maxNumberOfCheckpointsToRetain,
-                        completedCheckpoint ->
-                                tryRemoveCompletedCheckpoint(
-                                        completedCheckpoint,
-                                        completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                                        checkpointsCleaner,
-                                        postCleanup));
+                        completedCheckpoint -> tryRemove(completedCheckpoint.getCheckpointID()));

Review Comment:
   In `shutdown()`, the store actually calls `checkpointCleaner.cleanCheckpoint`, as opposed to adding a checkpoint, where it only removes it from ZK.
   
   After that, `shutdown()` calls `unregisterUnusedState` (line 200).
   I wonder if there will be a duplicate call to cleanup a checkpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r880782026


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,
+                // 2. update the status of the checkpoint in sharedStateRegistry to which the state
+                // handle belongs
+                sharedStateRegistry.registerReference(
+                        new SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()),
+                        new StreamStateHandleWrapper(stateHandle),

Review Comment:
   Yes, you're right.
   Does it make sense to use the same `EmptyDiscardStateObjectForRegister` in `ChangelogStateBackendHandleImpl`?
   The deletion of private state should happen discarding `CompletedCheckpoint` anyways, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1097494365

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d968b906c4e1f70f36b8d62fe53a4e041a3afd4f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d968b906c4e1f70f36b8d62fe53a4e041a3afd4f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d968b906c4e1f70f36b8d62fe53a4e041a3afd4f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r882367014


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -137,8 +146,8 @@ public StreamStateHandle registerReference(
         return entry.stateHandle;
     }
 
-    @Override
-    public void unregisterUnusedState(long lowestCheckpointID) {
+    private void unregisterState(
+            long lowestCheckpointID, Consumer<Long> markCheckpointInUseAction) {

Review Comment:
   I tested  the `markCheckpointInUseAction` in `CheckpointsCleanerTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r873853860


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,
+                // 2. update the status of the checkpoint in sharedStateRegistry to which the state
+                // handle belongs
+                sharedStateRegistry.registerReference(
+                        new SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()),
+                        new StreamStateHandleWrapper(stateHandle),

Review Comment:
   (replying to https://github.com/apache/flink/pull/19448#discussion_r873445202)
   As per the [comment](https://github.com/apache/flink/pull/19448#discussion_r873845873) above, I think there should be a wrapper to have an empty `discardState`. So the interface may stay the same (FLINK-25862 not needed here IMO).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r873445202


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,
+                // 2. update the status of the checkpoint in sharedStateRegistry to which the state
+                // handle belongs
+                sharedStateRegistry.registerReference(
+                        new SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()),
+                        new StreamStateHandleWrapper(stateHandle),

Review Comment:
   There are a few details to ask for your opinion:
   ● should we refactor `SharedStateRegistry` to not limit `StreamStateHandle` to register(https://issues.apache.org/jira/browse/FLINK-25862) here?
   ● if yes, how should we deal with the multi-register case? For changelog states, we would like to avoid `SharedStateRegistry`  to discard multi-registered identical changelog states( https://issues.apache.org/jira/browse/FLINK-26101), which is different from other state handles.  In other words, we also need a wrapper with a different #equals method for changelog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1127364909

   @rkhachatryan I updated the PR with the implementation of option 3, could you please take a review again? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874932077


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##########
@@ -227,6 +229,14 @@ private static void registerSharedState(
         for (KeyedStateHandle stateHandle : stateHandles) {
             if (stateHandle != null) {
                 stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
+                // Registering state handle to the given sharedStateRegistry serves two purposes:
+                // 1. let sharedStateRegistry be responsible for cleaning the state handle,

Review Comment:
   Nice suggestion!  I think that ID is also unnecessary, because SharedStateRegistryKey contains state handle id.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1118305413

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r901863131


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) {
+                    try {
+                        LOG.debug("Try to discard checkpoint {}.", checkpoint.getCheckpointID());
+                        cleanCheckpoint(

Review Comment:
   The synchronization is still using different objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19448:
URL: https://github.com/apache/flink/pull/19448#issuecomment-1171138404

   Thanks for the investigation @fredia , 
   I'm still concerned about the `globalWindow` benchmark.
   Previously, it didn't fall below ~6600 (except for the unrelated regression).
   So I assume the results you posted are from local launches. However, I think that it's more reliable to use a dedicated machine (http://codespeed.dak8s.net:8000/). I've re-launced `globalWindow` benchmark for the master and this PR, let's wait for the results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r895294357


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);

Review Comment:
   This test scenario is:
     1. changelog disabled(the interval of materialization doesn't work here), get chk-1  to restore.
     2. changelog enabled, retore from chk-1, get chk-2(**no materialization occurs, the materialized part of chk-2 is in chk-1**).
     3. changelog enable, restore from chk-2, test whether the materialized part in chk-1 is exist.
   
   So, the interval is deliberately set to be long here to avoid materializing. To simulate a scenario where one JOB restore again before one materialization occurs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r895303274


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);
+        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+    }
+
+    @Test
+    public void testCheckpointFolderDeletion() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), miniCluster).get();
+
+        // cancel after next materialization
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 100, 100);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        Thread.sleep(1000);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        assertFalse(checkpointFolderExists(firstRestorePath.substring(5)));

Review Comment:
   > 2. Is it guaranteed that the folder is cleaned up by the time `cancelJob.get` returns? If not, the assertion might be flaky
   
   the folder may be cleaned up by the time `checkpoint subsumption` or `shutdown`.  you are right, the assertion might be flaky. This test is not stable, so I delete it. And folder deletion can be tested by `CheckpointsCleanerTest#testCleanSubsumedCheckpointNormal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r881839740


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -107,6 +144,7 @@ private void maybeCompleteCloseUnsafe() {
         if (numberOfCheckpointsToClean == 0 && cleanUpFuture != null) {
             cleanUpFuture.complete(null);
         }
+        subsumedCheckpoints.clear();

Review Comment:
   Right, moved it into `if`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r881373022


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +175,47 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        unregisterState(lowestCheckpointID, (x) -> {});

Review Comment:
   > all checkpoints are discarded by CompletedCheckpointStore when shutdown
   I doubt **all**, because some are transfered from `CompletedCheckpointStore` to `SharedStateRegistry`. So `CompletedCheckpointStore` simply doesn't referenece them anymore, does it?
   
   (I think this is also true if we introduce `SubsumedCheckpointStore` dicsussed offline)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r881513176


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +175,47 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        unregisterState(lowestCheckpointID, (x) -> {});

Review Comment:
   You are right, thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r880787079


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,13 +58,30 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID);
+
+    /**
+     * Register a reference to the given checkpoint in the registry.
+     *
+     * @param checkpoint which is completed
+     */
+    void registerCompletedCheckpoint(CompletedCheckpoint checkpoint);

Review Comment:
   Wouldn't it be more clear to pass the checkpoint to `unregisterUnusedStateAndCheckpoint` (and remove this method)?
   
   That would require changing `Optional<Checkpoint>` to `List` in 
   `StandaloneCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne`, but that shouldn't be an issue.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -47,6 +54,9 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
     /** All registered state objects by an artificial key */
     private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates;
 
+    /** All registered checkpoints */
+    private final Map<Long, CompletedCheckpoint> registeredCheckpoints;

Review Comment:
   Rename to `subsumedCheckpoints`? (and update the javadoc)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +175,47 @@ public void unregisterUnusedState(long lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        unregisterState(lowestCheckpointID, (x) -> {});

Review Comment:
   Now that subsumed checkpoints are "owned" by the registry, it has to discard them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r857464249


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -203,10 +204,15 @@ private void tryRestoreExecutionGraphFromSavepoint(
             final CheckpointCoordinator checkpointCoordinator =
                     executionGraphToRestore.getCheckpointCoordinator();
             if (checkpointCoordinator != null) {
+                Optional<String> stateBackendName = executionGraphToRestore.getStateBackendName();
+                boolean changelogEnabled =
+                        stateBackendName.isPresent()
+                                && "ChangelogStateBackend".equals(stateBackendName.get());

Review Comment:
   I meant that there is no guarantee that JM and TM call `StateBackendLoader` with the same configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org