You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Feifan Wang (Jira)" <ji...@apache.org> on 2021/08/24 17:01:00 UTC

[jira] [Created] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

Feifan Wang created FLINK-23949:
-----------------------------------

             Summary: first incremental checkpoint after a savepoint will degenerate into a full checkpoint
                 Key: FLINK-23949
                 URL: https://issues.apache.org/jira/browse/FLINK-23949
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / State Backends
    Affects Versions: 1.13.2, 1.12.5, 1.11.4
            Reporter: Feifan Wang
         Attachments: image-2021-08-25-00-59-05-779.png

In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files corresponding to the checkpoint id,and clean it in _CheckpointListener#notifyCheckpointComplete ._
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
    synchronized (materializedSstFiles) {
        if (completedCheckpointId > lastCompletedCheckpointId) {
            materializedSstFiles
                    .keySet()
                    .removeIf(checkpointId -> checkpointId < completedCheckpointId);
            lastCompletedCheckpointId = completedCheckpointId;
        }
    }
}{code}
 

This works well without savepoint, but when a savepoint is completed, it will clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the first checkpoint after the savepoint must upload all files in rocksdb.

!image-2021-08-25-00-59-05-779.png|width=1640,height=225!

Solving the problem is also very simple, I propose to change CheckpointListener#notifyCheckpointComplete to the following form :

 
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
    synchronized (materializedSstFiles) {
        if (completedCheckpointId > lastCompletedCheckpointId
                && materializedSstFiles.keySet().contains(completedCheckpointId)) {
            materializedSstFiles
                    .keySet()
                    .removeIf(checkpointId -> checkpointId < completedCheckpointId);
            lastCompletedCheckpointId = completedCheckpointId;
        }
    }
}
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)