You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/08/09 03:09:36 UTC

[flink] branch master updated: [FLINK-28843][StateBackend] Fix restore from incremental checkpoint with changelog checkpoint in claim mode

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f708d0ba42 [FLINK-28843][StateBackend] Fix restore from incremental checkpoint with changelog checkpoint in claim mode
7f708d0ba42 is described below

commit 7f708d0ba42f727b3f8c3d77cef2108206cad2de
Author: Lihe Ma <ma...@163.com>
AuthorDate: Sat Aug 6 20:02:33 2022 +0800

    [FLINK-28843][StateBackend] Fix restore from incremental checkpoint with changelog checkpoint in claim mode
---
 .../changelog/ChangelogStateBackendHandle.java     | 61 +++++++++++++++++-----
 ...dicMaterializationSwitchStateBackendITCase.java |  3 ++
 2 files changed, 52 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
index 51433d10503..6cf86ec65af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.CheckpointBoundKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -149,29 +151,64 @@ public interface ChangelogStateBackendHandle
         private static KeyedStateHandle castToAbsolutePath(
                 KeyedStateHandle originKeyedStateHandle) {
             // For KeyedStateHandle, only KeyGroupsStateHandle and IncrementalKeyedStateHandle
-            // contain streamStateHandle, and the checkpointedStateScope of
-            // IncrementalKeyedStateHandle
-            // is shared, no need to
-            // cast. So, only KeyGroupsStateHandle need to cast.
-            if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
-                    || originKeyedStateHandle instanceof KeyGroupsSavepointStateHandle) {
+            // contain streamStateHandle, and both of them need to be cast
+            // as they all have state handles of private checkpoint scope.
+            if (originKeyedStateHandle instanceof KeyGroupsSavepointStateHandle) {
                 return originKeyedStateHandle;
-            } else {
+            }
+            if (originKeyedStateHandle instanceof KeyGroupsStateHandle) {
                 StreamStateHandle streamStateHandle =
                         ((KeyGroupsStateHandle) originKeyedStateHandle).getDelegateStateHandle();
 
                 if (streamStateHandle instanceof FileStateHandle) {
-                    FileStateHandle fileStateHandle =
-                            new FileStateHandle(
-                                    ((FileStateHandle) streamStateHandle).getFilePath(),
-                                    streamStateHandle.getStateSize());
+                    StreamStateHandle fileStateHandle = restoreFileStateHandle(streamStateHandle);
                     return KeyGroupsStateHandle.restore(
                             ((KeyGroupsStateHandle) originKeyedStateHandle).getGroupRangeOffsets(),
                             fileStateHandle,
                             originKeyedStateHandle.getStateHandleId());
                 }
-                return originKeyedStateHandle;
             }
+            if (originKeyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
+                IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
+                        (IncrementalRemoteKeyedStateHandle) originKeyedStateHandle;
+
+                StreamStateHandle castMetaStateHandle =
+                        restoreFileStateHandle(
+                                incrementalRemoteKeyedStateHandle.getMetaStateHandle());
+                Map<StateHandleID, StreamStateHandle> castSharedStates =
+                        incrementalRemoteKeyedStateHandle.getSharedState().entrySet().stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                Map.Entry::getKey,
+                                                e -> restoreFileStateHandle(e.getValue())));
+                Map<StateHandleID, StreamStateHandle> castPrivateStates =
+                        incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                Map.Entry::getKey,
+                                                e -> restoreFileStateHandle(e.getValue())));
+
+                return IncrementalRemoteKeyedStateHandle.restore(
+                        incrementalRemoteKeyedStateHandle.getBackendIdentifier(),
+                        incrementalRemoteKeyedStateHandle.getKeyGroupRange(),
+                        incrementalRemoteKeyedStateHandle.getCheckpointId(),
+                        castSharedStates,
+                        castPrivateStates,
+                        castMetaStateHandle,
+                        incrementalRemoteKeyedStateHandle.getCheckpointedSize(),
+                        incrementalRemoteKeyedStateHandle.getStateHandleId());
+            }
+            return originKeyedStateHandle;
+        }
+
+        private static StreamStateHandle restoreFileStateHandle(
+                StreamStateHandle streamStateHandle) {
+            if (streamStateHandle instanceof FileStateHandle) {
+                return new FileStateHandle(
+                        ((FileStateHandle) streamStateHandle).getFilePath(),
+                        streamStateHandle.getStateSize());
+            }
+            return streamStateHandle;
         }
 
         @Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java
index 7c1f74b8ec6..088bafd78bb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -57,6 +58,8 @@ public class ChangelogPeriodicMaterializationSwitchStateBackendITCase
     public void setup() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
+        // reduce file threshold to reproduce FLINK-28843
+        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20b"));
         FsStateChangelogStorageFactory.configure(
                 configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMinutes(1), 10);
         cluster =