You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vinoyang (JIRA)" <ji...@apache.org> on 2019/05/17 08:51:00 UTC

[jira] [Comment Edited] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints

    [ https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842009#comment-16842009 ] 

vinoyang edited comment on FLINK-10855 at 5/17/19 8:50 AM:
-----------------------------------------------------------

[~richtesn] I invite you is because it is related to the component we are working recently. If you do not mind, I can give a more detailed description:

Considering there are two tasks which are snapshotting belong to one job for one checkpoint.

Task1's behavior in JM end:
{code:java}
receiveDeclineMessage -> discardCheckpoint -> pendingCheckpoint.abort -> dispose(in fianlly block) -> CheckpointStorageLocation#disposeOnFailure(delete cp dir){code}
Task2's behavior in TM end:

write snapshot data to statebackend and create checkpoint location again and send ack message to JM end.

Task2's behavior in JM end:
{code:java}
receiveAcknowledgeMessage -> discardSubtaskState (else branch)
{code}
The PendingCheckpoint#abort may not cause the job to fail and recover in {{CheckpointFailureManager}}. And {{CheckpointCoordinator}} would not send a cancel pending checkpoint message to other tasks who are doing a snapshot.

So I suggest we could introduce a cleanup mechanism.


was (Author: yanghua):
[~richtesn] I invite you is because it is related to the component we are working. If you do not mind, I can give a more detailed description:

Consider there are two tasks which are snapshotting in one job for one checkpoint.

Task1's behavior in JM end:
{code:java}
receiveDeclineMessage -> discardCheckpoint -> pendingCheckpoint.abort -> dispose(in fianlly block) -> CheckpointStorageLocation#disposeOnFailure(delete cp dir){code}
Task2's behavior in TM end:

write snapshot data to statebackend and create checkpoint location again and send ack message to JM end.

Task2's behavior in JM end:
{code:java}
receiveAcknowledgeMessage -> discardSubtaskState (else branch)
{code}
The PendingCheckpoint#abort may not cause the job to fail and recover in {{CheckpointFailureManager}}. And {{CheckpointCoordinator}} would not send a cancel pending checkpoint message to other tasks who are doing a snapshot.

So I suggest we could introduce a cleanup mechanism.

> CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-10855
>                 URL: https://issues.apache.org/jira/browse/FLINK-10855
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.5.5, 1.6.2, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: vinoyang
>            Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot be acknowledged, we discard the subtask state in the {{CheckpointCoordinator}}. What's not happening in this case is that we delete the parent directory of the checkpoint. This only happens when we dispose a {{PendingCheckpoint#dispose}}. 
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not being ready) and we delete the checkpoint directory. Next another task writes its checkpoint data to the checkpoint directory (thereby creating it again) and sending an acknowledge message back to the {{CheckpointCoordinator}}. The {{CheckpointCoordinator}} will realize that there is no longer a {{PendingCheckpoint}} and will discard the sub task state. This will remove the state files from the checkpoint directory but will leave the checkpoint directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)