You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yanfei Lei (Jira)" <ji...@apache.org> on 2022/07/29 07:22:00 UTC
[jira] [Commented] (FLINK-28614) Empty local state folders not cleanup on retrieving local state
[ https://issues.apache.org/jira/browse/FLINK-28614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17572789#comment-17572789 ]
Yanfei Lei commented on FLINK-28614:
------------------------------------
Solved in aef75be34d99c737f5c565703a971027ac44f855..52519a8eb695c9523c546439c66910b15f19be20.
See [this discussion|https://github.com/apache/flink/pull/19907#discussion_r926200002].
> Empty local state folders not cleanup on retrieving local state
> ---------------------------------------------------------------
>
> Key: FLINK-28614
> URL: https://issues.apache.org/jira/browse/FLINK-28614
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.15.0, 1.15.1, 1.16.0
> Reporter: Yanfei Lei
> Priority: Major
> Fix For: 1.16.0
>
>
> It would create a checkpoint directory when trying to load {{TaskStateSnapshot}} from the disk. The local checkpoint directory is not deleted on exit {{tryLoadTaskStateSnapshotFromDisk() }}even though {{TaskStateSnapshot}} doesn't exist.
>
> {code:java}
> File getTaskStateSnapshotFile(long checkpointId) {
> final File checkpointDirectory =
> localRecoveryConfig
> .getLocalStateDirectoryProvider()
> .orElseThrow(
> () -> new IllegalStateException("Local recovery must be enabled."))
> .subtaskSpecificCheckpointDirectory(checkpointId);
> if (!checkpointDirectory.exists() && !checkpointDirectory.mkdirs()) {
> throw new FlinkRuntimeException(
> String.format(
> "Could not create the checkpoint directory '%s'", checkpointDirectory));
> }
> return new File(checkpointDirectory, TASK_STATE_SNAPSHOT_FILENAME);
> } {code}
>
>
> This will cause the folder in /{{{}localState{}}} to remain after failover. Here is an example:
> {code:java}
> 41854 [flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 35644df535ca04613d6a6116dcfcfd59 from Checkpoint 2 @ 1658292943408 for 35644df535ca04613d6a6116dcfcfd59 located at file:/var/folders/4n/q3r37vws2f910rt_f469kwg00000gn/T/junit1426665332205293555/junit63847204117629783/35644df535ca04613d6a6116dcfcfd59/chk-2.
> _______________________________________
> directory of localState
> _______________________________________
> tm_2
> │ ├── blobStorage
> │ ├── localState
> │ │ └── aid_6df21e53ca06ea69ee0643d25d27dbee
> │ │ └── jid_35644df535ca04613d6a6116dcfcfd59
> │ │ └── vtx_0a448493b4782967b150582570326227_sti_1
> │ │ ├── chk_2
> │ │ └── chk_5
> │ │ ├── _task_state_snapshot
> │ │ ├── edab98058083464a9ca29b6d7a950c68
> │ │ │ ├── 000014.sst
> │ │ │ ├── 000015.sst
> │ │ │ ├── 000022.sst
> │ │ │ ├── 000023.sst
> │ │ │ ├── CURRENT
> │ │ │ ├── MANIFEST-000018
> │ │ │ └── OPTIONS-000021
> │ │ └── f3724ae6-fd24-4e9a-80a8-02aa34bca0f0 {code}
> cc: [~trohrmann] , [~masteryhx]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)