You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/19 12:31:10 UTC

[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r853004206


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -211,13 +220,87 @@ public long getStateSize() {
      * @param restoreMode the mode in which this checkpoint was restored from
      */
     public void registerSharedStatesAfterRestored(
-            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+            SharedStateRegistry sharedStateRegistry,
+            RestoreMode restoreMode,
+            boolean changelogEnabled) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
+            if (changelogEnabled) {
+                for (OperatorState operatorState : operatorStates.values()) {
+                    for (Map.Entry<Integer, OperatorSubtaskState> entry :
+                            operatorState.getSubtaskStates().entrySet()) {
+                        List<KeyedStateHandle> changelogStateBackendHandles =
+                                entry.getValue().getManagedKeyedState().stream()
+                                        .map(x -> getChangelogStateBackendHandle(x))
+                                        .collect(Collectors.toList());
+                        StateObjectCollection<KeyedStateHandle> stateHandles =
+                                new StateObjectCollection<>(changelogStateBackendHandles);
+                        operatorState.putState(
+                                entry.getKey(),
+                                entry.getValue()
+                                        .toBuilder()
+                                        .setManagedKeyedState(stateHandles)
+                                        .build());

Review Comment:
   Thanks for your suggestion! 
   Keeping `CompletedCheckpoint` immutable is a better approach, so I moved the code of rebuilding checkpoint to `Checkpoints.loadAndValidateCheckpoint`, and put the cast logic in `ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl`. 
   
   > I think registering all KeyedStateHandles with the SharedStateRegistry on recovery in CLAIM mode would also solve the problem, wouldn't it?
   
   For this suggestion, I think it may not work as well,  because the `discardState()` of some KeyedStateHandles are **not empty**,  the state would be discarded on checkpoint subsuming.
   and I also left a comment under [FLINK-25872](https://issues.apache.org/jira/browse/FLINK-25872), maybe we can discuss in the ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org