You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthias Pohl (Jira)" <ji...@apache.org> on 2023/04/03 07:03:00 UTC

[jira] [Commented] (FLINK-26606) CompletedCheckpoints that failed to be discarded are not stored in the CompletedCheckpointStore

    [ https://issues.apache.org/jira/browse/FLINK-26606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707789#comment-17707789 ] 

Matthias Pohl commented on FLINK-26606:
---------------------------------------

Moving the discussion from [PR #22121|https://github.com/apache/flink/pull/22121] into this parent issue:
{quote}
@XComp I'm interested in the background of your ticket. Based on the description, I think the key point of this ticket is that "CompletedCheckpoints are being discarded in CheckpointsCleaner". Could you provide the specific codepath for this? Additionally, I would like to learn more about "the contract of StateObject#discardState" . If these are clear, I would be happy to drive the entire issue.
{quote}

Repeatable cleanup with introduced in 1.15 with [FLIP-194|https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore]. The repeatable cleanup is executed when shutting down the Dispatcher. Usually, this also cleans up the {{CompletedCheckpoints}} when shutting down the {{CompletedCheckpointStore}}. But checkpoints are not only cleaned up when all jobs are finished. We also clean up (unused) checkpoints while the job is running to reduce memory usage. The number of checkpoints that are kept is defined by [state.checkpoints.num-retained|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained]. [FLIP-270|https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints] is dealing with it.

The logic for this can be found in [CheckpointCoordinator.addCompletedCheckpointToStoreAndSubsumeOldest:1454|https://github.com/apache/flink/blob/eb17ec3f05d4bd512bc70ee79296d0b884894eaf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1454]. There you see how a more recent checkpoint is added to the {{CompletedCheckpointStore}} and the oldest one is removed and cleaned up. 

Going through the code you will see that it utilizes the {{CheckpointsCleaner}}. But the {{CheckpointsCleaner}} will only try to discard the checkpoint. If there's an error for any reason, no reference is kept. Only a log warning is printed. Semantically, it means that the ownership of the checkpoint is transitioned to the user through this log message. Flink is not taking care of the cleanup anymore. The user has to deal with it if he/she needs to. The idea of [FLIP-270|https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints] is to add repeatable cleanup in the {{CheckpointsCleaner}} as well.

Repeatable cleanup (as it is implemented right now) repeats the logic as long as there is an exception caught. If no exception appears during cleanup, the code will assume success and complete. And here's is where this issue comes into play. We have to make sure for the checkpoints that the cleanup fails iff (i.e. if and only if) the cleanup didn't succeed and the resource is still available. That's why we need to make sure that the {{CompletedCheckpoint}} and its state is cleaned up idempotently: We don't want an error to appear if the artifacts do not exist anymore. Otherwise, repeatable cleanup would try forever.

I hope this context helps.

> CompletedCheckpoints that failed to be discarded are not stored in the CompletedCheckpointStore
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26606
>                 URL: https://issues.apache.org/jira/browse/FLINK-26606
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Coordination
>    Affects Versions: 1.15.0
>            Reporter: Matthias Pohl
>            Priority: Major
>
> We introduced a repeatable per-job cleanup after the job reached a globally-terminated state. It also tries to clean up the {{CompletedCheckpointStore}}. But we missed one code path where {{CompletedCheckpoints}} are tried to be discarded in the {{CheckpointsCleaner}}. The {{CompletedCheckpointStore}} does not hold any references to these {{CompletedCheckpoints}} anymore. The shutdown at the end is not able to clean these checkpoints up.
> We should not remove the {{CompletedCheckpoints}} from the {{CompletedCheckpointStore}} if the deletion failed. This would enable us to retry deleting these artifacts at the end of the job and consider them in the retryable cleanup as well.
> The documentation was updated to cover this issue. Fixing this issue should also include removing the corresponding paragraph from the documentation (see [related flink-docs PR|https://github.com/apache/flink/pull/19058]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)