You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Sandeep Kathula (Jira)" <ji...@apache.org> on 2022/01/14 20:40:00 UTC

[jira] [Created] (BEAM-13667) Users cannot provide their own sharding function when using FileIO

Sandeep Kathula created BEAM-13667:
--------------------------------------

             Summary: Users cannot provide their own sharding function when using FileIO
                 Key: BEAM-13667
                 URL: https://issues.apache.org/jira/browse/BEAM-13667
             Project: Beam
          Issue Type: Improvement
          Components: sdk-java-core
    Affects Versions: 2.35.0
            Reporter: Sandeep Kathula


Beam uses RandomShardingFunction ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L834]) by default for sharding when using FileIO.write().

 

RandomShardingFuncction doesn’t work well with Flink Runner. Its assigning same key (hashDestination(destination, destinationCoder))  along with the shard number. ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L863])

This is causing different ShardedKeys going to same task slot. As a result, there is no equal distribution of files written from different task slots.

E.g. 2 files are written by task slot 1, 3 files written by task slot 2, 0 files written by other task slots.

 

As a result, we are seeing Out of Memory from the pods writing more files.

 

At [https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L695-L698]

There is an option to give a different sharding function. But there is no way for the user to mention different sharding function when using FileIO class.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)