You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Anton Kalashnikov (Jira)" <ji...@apache.org> on 2022/04/14 12:09:00 UTC

[jira] [Commented] (FLINK-26803) Merge small ChannelState file for Unaligned Checkpoint

    [ https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522261#comment-17522261 ] 

Anton Kalashnikov commented on FLINK-26803:
-------------------------------------------

[~fanrui], thanks for this idea. I did some research about this(extra thanks to [~roman] for feedback and explanation). Some results:
 Every parallel instance(operator chain) creates:
* one file for input + output channel state
* one file for each operator in the chain
So `number of files = number of chains * parallelism + state backend * parallelism`

As I understand, the main reason why Flink has such logic is the simplicity of implementation + easy rescaling. Theoretically, the problem with rescaling can be resolved by adding the offset with a specific subtaskIndex but there is still the problem with creating `stream factory` on `TaskManager` since Flink don't know in advance all task that will be executed on it. Also reading should be changed as well to avoid loading one big file multiply times.

You also can check this configuration [state.storage.fs.memory-threshold|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-storage-fs-memory-threshold]. It helps to reduce the number of the files if states are small enough. Maybe it will help you.

In conclusion, I don't really sure that it makes sense to implement this feature since it looks too complicated but the advantages are not so obvious. But of course I can be wrong somewhere and we can discuss it. But anyway, I think we need firstly to collect all advantages and try to understand possible implementation.

> Merge small ChannelState file for Unaligned Checkpoint
> ------------------------------------------------------
>
>                 Key: FLINK-26803
>                 URL: https://issues.apache.org/jira/browse/FLINK-26803
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Network
>            Reporter: fanrui
>            Priority: Major
>
> When making an unaligned checkpoint, the number of ChannelState files is TaskNumber * subtaskNumber. For high parallelism job, it writes too many small files. It causes high load for hdfs NN.
>  
> In our production, a job writes more than 50K small files for each Unaligned Checkpoint. Could we merge these files before write FileSystem? We can configure the maximum number of files each TM can write in a single Unaligned Checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)