You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Feifan Wang (Jira)" <ji...@apache.org> on 2021/08/24 17:01:00 UTC
[jira] [Created] (FLINK-23949) first incremental checkpoint after a
savepoint will degenerate into a full checkpoint
Feifan Wang created FLINK-23949:
-----------------------------------
Summary: first incremental checkpoint after a savepoint will degenerate into a full checkpoint
Key: FLINK-23949
URL: https://issues.apache.org/jira/browse/FLINK-23949
Project: Flink
Issue Type: Improvement
Components: Runtime / State Backends
Affects Versions: 1.13.2, 1.12.5, 1.11.4
Reporter: Feifan Wang
Attachments: image-2021-08-25-00-59-05-779.png
In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files corresponding to the checkpoint id,and clean it in _CheckpointListener#notifyCheckpointComplete ._
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
synchronized (materializedSstFiles) {
if (completedCheckpointId > lastCompletedCheckpointId) {
materializedSstFiles
.keySet()
.removeIf(checkpointId -> checkpointId < completedCheckpointId);
lastCompletedCheckpointId = completedCheckpointId;
}
}
}{code}
This works well without savepoint, but when a savepoint is completed, it will clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the first checkpoint after the savepoint must upload all files in rocksdb.
!image-2021-08-25-00-59-05-779.png|width=1640,height=225!
Solving the problem is also very simple, I propose to change CheckpointListener#notifyCheckpointComplete to the following form :
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
synchronized (materializedSstFiles) {
if (completedCheckpointId > lastCompletedCheckpointId
&& materializedSstFiles.keySet().contains(completedCheckpointId)) {
materializedSstFiles
.keySet()
.removeIf(checkpointId -> checkpointId < completedCheckpointId);
lastCompletedCheckpointId = completedCheckpointId;
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)