You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alexander Trushev (Jira)" <ji...@apache.org> on 2020/12/04 06:16:00 UTC

[jira] [Comment Edited] (FLINK-20208) Remove outdated in-progress files in StreamingFileSink

    [ https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17243716#comment-17243716 ] 

Alexander Trushev edited comment on FLINK-20208 at 12/4/20, 6:15 AM:
---------------------------------------------------------------------

[~gaoyunhaii], very thank you for your answer
 "it might not be the boundary to the pending files and in-progress files for all the bucket" - you are right, I missed it

"One possible solution is to reset the counter for each bucket to maxPartCounter after the snapshot. It should also be able to solve the issue that buckets get created after snapshot. " - I can't say I fully understood your solution. If necessary for me, I'll ask about it later.

"it would be better if the cleanup method does not rely on the union list state and would also work for the new version of sinks" - I agree. FileSink also leaves outdated files that need to be deleted.

StreamingFileSink and FileSink have different ways of creating in-progress files. The old sink uses subtaskIndex while the new one uses uuid. To make the cleanup method applicable to both sinks, we need to change the file creation method in the old sink to the new method without subtaskIndex.
 In this case, I see three subtasks:
 1) delete outdated files in the new sink
 2) change the creation method in the old sink to the new method without subtaskIndex and union list state
 3) delete outdated files in the old sink
 Do I understand you correctly about the cleanup method for both sinks? I'm not sure how difficult it will be to make the second point to avoid using the union list state.

I think that the first task should be done first. Because to solve it, we may need to add some information to the in-progress file names. Or put in-progress files in a subdirectory inside a bucket.
The result of working on the first task will help us assess the feasibility and complexity of the second and third.


was (Author: trushev):
[~gaoyunhaii], very thank you for your answer
"it might not be the boundary to the pending files and in-progress files for all the bucket" - you are right, I missed it

"One possible solution is to reset the counter for each bucket to maxPartCounter after the snapshot. It should also be able to solve the issue that buckets get created after snapshot. " - I can't say I fully understood your solution. If necessary for me, I'll ask about it later.

"it would be better if the cleanup method does not rely on the union list state and would also work for the new version of sinks" - I agree. FileSink also leaves outdated files that need to be deleted.

StreamingFileSink and FileSink have different ways of creating in-progress files. The old sink uses subtaskIndex while the new one uses uuid. To make the cleanup method applicable to both sinks, we need to change the file creation method in the old sink to the new method without subtaskIndex.
In this case, I see three subtasks:
1) delete outdated files in the new sink
2) change the creation method in the old sink to the new method without subtaskIndex and union list state
3) delete outdated files in the old sink
Do I understand you correctly about the cleanup method for both sinks? I'm not sure how difficult it will be to make the second point to avoid using the union list state.

> Remove outdated in-progress files in StreamingFileSink
> ------------------------------------------------------
>
>                 Key: FLINK-20208
>                 URL: https://issues.apache.org/jira/browse/FLINK-20208
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.2
>            Reporter: Alexander Trushev
>            Priority: Minor
>
> Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
> In the case:
>  # Acknowledged checkpoint
>  # Event is written to new .part-X-Y.inprogress.UUID1
>  # Job failure
>  # Job recovery from the checkpoint
>  # Event is written to new .part-X-Y.inprogress.UUID2
> we have the outdated part file .part-X-Y.inprogress.UUID1. Where X - subtask index, Y - part counter.
> *Proposal*
>  Add method
> {code:java}
> boolean shouldRemoveOutdatedParts()
> {code}
> to RollingPolicy.
>  Add configurable parameter to OnCheckpointRollingPolicy and to DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by default false)
> We can remove such outdated part files by the next algorithm while restoring job from a checkpoint
>  # After buckets state initializing check shouldRemoveOutdatedParts. If true then (2)
>  # For each bucket scan bucket directory
>  # If three conditions are true then remove part file:
>  part filename contains "inprogress";
>  subtask index from filename equals to current subtask index;
>  part counter from filename more than or equals to current max part counter.
> I propose to remove outdated files, because the similar proposal to overwrite outdated files has not been implemented
> [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)