You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pavel Solomin <p....@gmail.com> on 2021/09/03 22:16:15 UTC

Flink Runner - FileIO writeDynamic() - shards skew

Hello!

My Beam application reads from Kinesis streams, applies FixedWindow and
writes results to s3 files. I am using FileIO writeDynamic() applying
withNumShards(5)

Beam version - 2.32
Flink version - 1.11

Besides, I have autoBalanceWriteFilesShardingEnabled = true in the config.

What I observe sometimes in Flink dashboard ( ...
WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ... operator ) is
that resulting 5 shards get to be of approx. equal size. But quite often
shards land TaskManagers unevenly:

TaskManager 1 - 2 shards
TaskManager 2 - 3 shards
TaskManager 3 - 0 shards

Usually, job parallelism is greater than 5 - in the range of 18 ... 36 -
which is on purpose, to avoid having too many small files in the output. I
can't leave the runner's default either due to the same reason - for each
window pane I get dozens of files. I could set withNumShards(10) instead of
5, but that would require setting 2x window size, which requires 2x more
memory.

What are general recommendations for such cases? Is there other config /
FileIO setting I should take a look to?

Thank you!

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>