You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yanfei Lei (Jira)" <ji...@apache.org> on 2022/07/29 07:22:00 UTC

[jira] [Commented] (FLINK-28614) Empty local state folders not cleanup on retrieving local state

    [ https://issues.apache.org/jira/browse/FLINK-28614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17572789#comment-17572789 ] 

Yanfei Lei commented on FLINK-28614:
------------------------------------

Solved in aef75be34d99c737f5c565703a971027ac44f855..52519a8eb695c9523c546439c66910b15f19be20.

See [this discussion|https://github.com/apache/flink/pull/19907#discussion_r926200002].

> Empty local state folders not cleanup on retrieving local state
> ---------------------------------------------------------------
>
>                 Key: FLINK-28614
>                 URL: https://issues.apache.org/jira/browse/FLINK-28614
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.15.0, 1.15.1, 1.16.0
>            Reporter: Yanfei Lei
>            Priority: Major
>             Fix For: 1.16.0
>
>
> It would create a checkpoint directory when trying to load {{TaskStateSnapshot}} from the disk. The local checkpoint directory is not deleted on exit {{tryLoadTaskStateSnapshotFromDisk() }}even though {{TaskStateSnapshot}} doesn't exist. 
>  
> {code:java}
> File getTaskStateSnapshotFile(long checkpointId) {
>     final File checkpointDirectory =
>             localRecoveryConfig
>                     .getLocalStateDirectoryProvider()
>                     .orElseThrow(
>                             () -> new IllegalStateException("Local recovery must be enabled."))
>                     .subtaskSpecificCheckpointDirectory(checkpointId);
>     if (!checkpointDirectory.exists() && !checkpointDirectory.mkdirs()) {
>         throw new FlinkRuntimeException(
>                 String.format(
>                         "Could not create the checkpoint directory '%s'", checkpointDirectory));
>     }
>     return new File(checkpointDirectory, TASK_STATE_SNAPSHOT_FILENAME);
> } {code}
>  
>  
> This will cause the folder in /{{{}localState{}}} to remain after failover. Here is an example: 
> {code:java}
> 41854 [flink-akka.actor.default-dispatcher-8] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 35644df535ca04613d6a6116dcfcfd59 from Checkpoint 2 @ 1658292943408 for 35644df535ca04613d6a6116dcfcfd59 located at file:/var/folders/4n/q3r37vws2f910rt_f469kwg00000gn/T/junit1426665332205293555/junit63847204117629783/35644df535ca04613d6a6116dcfcfd59/chk-2.
> _______________________________________
> directory of localState
> _______________________________________ 
> tm_2
>     │   ├── blobStorage
>     │   ├── localState
>     │   │   └── aid_6df21e53ca06ea69ee0643d25d27dbee
>     │   │       └── jid_35644df535ca04613d6a6116dcfcfd59
>     │   │           └── vtx_0a448493b4782967b150582570326227_sti_1
>     │   │               ├── chk_2
>     │   │               └── chk_5
>     │   │                   ├── _task_state_snapshot
>     │   │                   ├── edab98058083464a9ca29b6d7a950c68
>     │   │                   │   ├── 000014.sst
>     │   │                   │   ├── 000015.sst
>     │   │                   │   ├── 000022.sst
>     │   │                   │   ├── 000023.sst
>     │   │                   │   ├── CURRENT
>     │   │                   │   ├── MANIFEST-000018
>     │   │                   │   └── OPTIONS-000021
>     │   │                   └── f3724ae6-fd24-4e9a-80a8-02aa34bca0f0 {code}
> cc: [~trohrmann] , [~masteryhx] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)