You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/13 09:10:15 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r849248992


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -211,13 +220,87 @@ public long getStateSize() {
      * @param restoreMode the mode in which this checkpoint was restored from
      */
     public void registerSharedStatesAfterRestored(
-            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+            SharedStateRegistry sharedStateRegistry,
+            RestoreMode restoreMode,
+            boolean changelogEnabled) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
+            if (changelogEnabled) {
+                for (OperatorState operatorState : operatorStates.values()) {
+                    for (Map.Entry<Integer, OperatorSubtaskState> entry :
+                            operatorState.getSubtaskStates().entrySet()) {
+                        List<KeyedStateHandle> changelogStateBackendHandles =
+                                entry.getValue().getManagedKeyedState().stream()
+                                        .map(x -> getChangelogStateBackendHandle(x))
+                                        .collect(Collectors.toList());
+                        StateObjectCollection<KeyedStateHandle> stateHandles =
+                                new StateObjectCollection<>(changelogStateBackendHandles);
+                        operatorState.putState(
+                                entry.getKey(),
+                                entry.getValue()
+                                        .toBuilder()
+                                        .setManagedKeyedState(stateHandles)
+                                        .build());

Review Comment:
   I have several concerns about placing all this logic here in `CompletedCheckpoint`:
   1. `CompletedCheckpoint` is intended to be immutable; here, it's contents is changed after potentially being used
   1. The programmer can make a mistake by registering this checkpoint with the registry usnig some other method (and not rebuilding handles)
   1. `CompletedCheckpoint` is made aware of Changelog and some details, like `ChangelogStateBackendHandleImpl`
   
   How about moving all this code of rebuilding the checkpoint closer to where it's loaded? I.e. `Checkpoints.loadAndValidateCheckpoint` and `DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoint`?
   
   Additionally for (3), I think we can not avoid **some** JM code being aware of changelog (unless we generalize this somehow, which is probably too early for now). 
   But maybe this code can be put in a single changelog-related class, e.g. existing `ChangelogStateBackendHandle` or some new utility class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org