You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2018/12/06 20:54:00 UTC

[jira] [Comment Edited] (FLINK-10930) Refactor checkpoint directory layout

    [ https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712017#comment-16712017 ] 

Stephan Ewen edited comment on FLINK-10930 at 12/6/18 8:53 PM:
---------------------------------------------------------------

[~yunta] Better exception handling is definitely always a really good idea. I would suggest to proceed as follows:

 - We work towards a model where checkpoint failures never cause task failures, but only "decline messages". As fas as I know, [~azagrebin] mentioned that there is some discussion already. Could you chime in here, Andrey?

 - We fix all cases of state being left in exception cases that we find.

 - We introduce an exclusive state cleanup daemon (see below)

 - We introduce a shared state cleanup daemon (see below)

h3. Cleaning up exclusive state (idea)
 - when the daemon starts a cleanup sweep it determines what the oldest still retained checkpoint number is
 - it enumerates all directories "CHK-XXX" and recursively deletes the ones older than the oldest retained checkpoint

h3. Cleaning up shared state (idea)
 - every shared state file needs to encode the checkpoint-id of the checkpoint during which it was created
 - when the daemon does a cleanup swipe, it atomically grabs the number of the latest completed checkpoint, and a snapshot of all shared state handles in the shared state registry
 - the daemon then lists all files in the shared state directory and deletes all files that are neither referenced by a shared state handle and are older than the latest completed checkpoint

What do you think about that?


was (Author: stephanewen):
[~yunta] Better exception handling is definitely always a really good idea. I would suggest to proceed as follows:

  - We work towards a model where checkpoint failures never cause task failures, but only "decline messages". As fas as I know, [~azagrebin] mentioned that there is some discussion already. Could you chime in here, Andrey?

  - We fix all cases of state being left in exception cases that we find.

  - We introduce an exclusive state cleanup daemon (see below)

  - We introduce a shared state cleanup daemon (see below)

h3. Cleaning up exclusive state (idea)

  - when the daemon starts a cleanup sweep it determines what the oldest still retained checkpoint number is
  - it enumerates all directories "CHK-XXX" and recursively deletes the ones older than the oldest retained checkpoint


h3. Cleaning up shared state (idea)

  - every shared state file needs to encode the checkpoint-id of the checkpoint during which it was created
  - when the daemon does a cleanup swipe, it atomically grabs the number of the latest completed checkpoint, and a snapshot of all shared state handles in the shared state registry
  - the daemon then lists all files in the shared state directory and deletes all files that are neither referenced by a shared state handle and are older than the latest completed checkpoint


What do you think about that?


> Refactor checkpoint directory layout
> ------------------------------------
>
>                 Key: FLINK-10930
>                 URL: https://issues.apache.org/jira/browse/FLINK-10930
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.8.0
>            Reporter: Yun Tang
>            Assignee: Yun Tang
>            Priority: Major
>             Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
> 		    |
> 		    +-- shared/
> 	            +-- taskowned/
> 		    +-- chk-1/      // metadata and operator-state files
> 	            +-- chk-2/
>                     ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, which is the {{chk-id}} checkpoint directory, should only be one left. However, as FLINK-10855 interpreted, the failed/expired checkpoint directories would also be left. This is really confusing for users who [uses externalized checkpoint to resume job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint], not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still contains the operator state files, I have no idea how to clean the useless {{chk-id}} checkpoint directory gracefully. Once job manager dispose the failed/expired checkpoint, the target {{chk-id}} checkpoint directory would be deleted by JM. However, this directory would also be create by tasks who having not reported to JM. When {{checkpoint coordinator}} received those late expired tasks, it would discard those useless handles. However, if JM also plans to delete the empty parent folder, which is already unsupported after FLINK-8540, another task uploading operator state files would meet exception due to its writing target's parent directory has just been removed. Currently, we handle task checkpoint failure as task failure and the whole job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of exclusive directories, one is still several {{chk-id}} checkpoint directories but only contains its exclusive {{meta data}}, the other is just one directory named {{exclusive}} which containing the operator state files. Operator state files are exclusive to just one specified checkpoint, we could also add {{checkpoint-id}} within their file name to let users easily clean up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
>                     |
> 		    +-- shared/
> 		    +-- taskowned/
> 	            +-- exclusive/    // operator state files
> 		    +-- chk-1/        // metadata
>                     +-- chk-2/
>                     ...{code}
>  
> This new directory layout would not affect users who use external checkpoint to resume jobs, since they still just give {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)