You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/08/09 03:09:36 UTC
[flink] branch master updated: [FLINK-28843][StateBackend] Fix restore from incremental checkpoint with changelog checkpoint in claim mode
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7f708d0ba42 [FLINK-28843][StateBackend] Fix restore from incremental checkpoint with changelog checkpoint in claim mode
7f708d0ba42 is described below
commit 7f708d0ba42f727b3f8c3d77cef2108206cad2de
Author: Lihe Ma <ma...@163.com>
AuthorDate: Sat Aug 6 20:02:33 2022 +0800
[FLINK-28843][StateBackend] Fix restore from incremental checkpoint with changelog checkpoint in claim mode
---
.../changelog/ChangelogStateBackendHandle.java | 61 +++++++++++++++++-----
...dicMaterializationSwitchStateBackendITCase.java | 3 ++
2 files changed, 52 insertions(+), 12 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
index 51433d10503..6cf86ec65af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.CheckpointBoundKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -149,29 +151,64 @@ public interface ChangelogStateBackendHandle
private static KeyedStateHandle castToAbsolutePath(
KeyedStateHandle originKeyedStateHandle) {
// For KeyedStateHandle, only KeyGroupsStateHandle and IncrementalKeyedStateHandle
- // contain streamStateHandle, and the checkpointedStateScope of
- // IncrementalKeyedStateHandle
- // is shared, no need to
- // cast. So, only KeyGroupsStateHandle need to cast.
- if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
- || originKeyedStateHandle instanceof KeyGroupsSavepointStateHandle) {
+ // contain streamStateHandle, and both of them need to be cast
+ // as they all have state handles of private checkpoint scope.
+ if (originKeyedStateHandle instanceof KeyGroupsSavepointStateHandle) {
return originKeyedStateHandle;
- } else {
+ }
+ if (originKeyedStateHandle instanceof KeyGroupsStateHandle) {
StreamStateHandle streamStateHandle =
((KeyGroupsStateHandle) originKeyedStateHandle).getDelegateStateHandle();
if (streamStateHandle instanceof FileStateHandle) {
- FileStateHandle fileStateHandle =
- new FileStateHandle(
- ((FileStateHandle) streamStateHandle).getFilePath(),
- streamStateHandle.getStateSize());
+ StreamStateHandle fileStateHandle = restoreFileStateHandle(streamStateHandle);
return KeyGroupsStateHandle.restore(
((KeyGroupsStateHandle) originKeyedStateHandle).getGroupRangeOffsets(),
fileStateHandle,
originKeyedStateHandle.getStateHandleId());
}
- return originKeyedStateHandle;
}
+ if (originKeyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
+ IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle) originKeyedStateHandle;
+
+ StreamStateHandle castMetaStateHandle =
+ restoreFileStateHandle(
+ incrementalRemoteKeyedStateHandle.getMetaStateHandle());
+ Map<StateHandleID, StreamStateHandle> castSharedStates =
+ incrementalRemoteKeyedStateHandle.getSharedState().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e -> restoreFileStateHandle(e.getValue())));
+ Map<StateHandleID, StreamStateHandle> castPrivateStates =
+ incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e -> restoreFileStateHandle(e.getValue())));
+
+ return IncrementalRemoteKeyedStateHandle.restore(
+ incrementalRemoteKeyedStateHandle.getBackendIdentifier(),
+ incrementalRemoteKeyedStateHandle.getKeyGroupRange(),
+ incrementalRemoteKeyedStateHandle.getCheckpointId(),
+ castSharedStates,
+ castPrivateStates,
+ castMetaStateHandle,
+ incrementalRemoteKeyedStateHandle.getCheckpointedSize(),
+ incrementalRemoteKeyedStateHandle.getStateHandleId());
+ }
+ return originKeyedStateHandle;
+ }
+
+ private static StreamStateHandle restoreFileStateHandle(
+ StreamStateHandle streamStateHandle) {
+ if (streamStateHandle instanceof FileStateHandle) {
+ return new FileStateHandle(
+ ((FileStateHandle) streamStateHandle).getFilePath(),
+ streamStateHandle.getStateSize());
+ }
+ return streamStateHandle;
}
@Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java
index 7c1f74b8ec6..088bafd78bb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -57,6 +58,8 @@ public class ChangelogPeriodicMaterializationSwitchStateBackendITCase
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
+ // reduce file threshold to reproduce FLINK-28843
+ configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20b"));
FsStateChangelogStorageFactory.configure(
configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMinutes(1), 10);
cluster =