You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Reuven Lax <re...@google.com> on 2019/05/01 12:12:07 UTC

Re: Custom shardingFn for FileIO

I haven't looked at the PR in depth yet, but it appears that someone
running a pipeline today who then tries to update post this PR will have
the coder change to DefaultShardKeyCoder, even if they haven't picked any
custom function. Is that correct, or am I misreading things?

Reuven

On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Hm, what would be the scenario? Have version A running with original
> random sharding and then start version B where I change sharding to some
> custom function?
> So I have to enable the pipeline to digest old keys from GBK restored
> state and also work with new keys produced to GBK going forward?
>
> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>
>> Initial thought on PR: we usually try to limit changing coders in these
>> types of transforms to better support runners that allow in-place updates
>> of pipelines. Can this be done without changing the coder?
>>
>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> I have created a PR for enhancing WriteFiles for custom sharding
>>> function.
>>> https://github.com/apache/beam/pull/8438
>>>
>>> If this sort of change looks good, then next step would be to use in in
>>> Flink runner transform override. Let me know what do you think
>>>
>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>> PTransformOverride by runner.
>>>>
>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>> function for choosing the shard number (I assume the problem is that there
>>>>> are cases where the current random assignment is not good?). However this
>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>> know details of the runner when writing their code.
>>>>>
>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>>> of
>>>>>> shards in case of Runner-determined sharding.
>>>>>>
>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>> number
>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>> that
>>>>>> logic is buried inside WriteFiles.
>>>>>>
>>>>>> -Max
>>>>>>
>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>>> keys.
>>>>>> >
>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>>>>> > <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> >     Right now, sharding can be specified only via target
>>>>>> `shardCount`,
>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>> >     proposing to be able to pass also a function which will allow
>>>>>> to the
>>>>>> >     user (or runner) control how is shard determined and what key
>>>>>> will
>>>>>> >     be used to represent it
>>>>>> >
>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>> extends
>>>>>> >     Serializable {
>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>> >     shardCount: Integer);
>>>>>> >     }
>>>>>> >
>>>>>> >     Default implementation can be what is right now =>  random shard
>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>> >
>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>> >     <ma...@google.com>> wrote:
>>>>>> >
>>>>>> >         If sharding is not specified, then the semantics are
>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>> takes
>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>> hasn't
>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>> same
>>>>>> >         instead of pushing this to the users?
>>>>>> >
>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>> >
>>>>>> >             Hi Jozef,
>>>>>> >
>>>>>> >             For sharding in FileIO there are basically two options:
>>>>>> >
>>>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>>>> >             across workers
>>>>>> >             (2) num_shards >> num_workers => good spread of the load
>>>>>> >             across workers,
>>>>>> >             but huge number of files
>>>>>> >
>>>>>> >             Your approach would give users control over the sharding
>>>>>> >             keys such that
>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>> >
>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>> make sense.
>>>>>> >
>>>>>> >             Thanks,
>>>>>> >             Max
>>>>>> >
>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>> >              > Hello,
>>>>>> >              >
>>>>>> >              > Right now, if someone needs sharded files via FileIO,
>>>>>> >             there is only one
>>>>>> >              > option which is random (round robin) shard
>>>>>> assignment per
>>>>>> >             element and it
>>>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>>>> which
>>>>>> >             follows.
>>>>>> >              >
>>>>>> >              > I would like to generalize this and have a
>>>>>> possibility to
>>>>>> >             provide some
>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>> FileIO.
>>>>>> >              > What I am mainly after is, to have a possibility to
>>>>>> >             provide optimisation
>>>>>> >              > for Flink runtime and pass in a special function
>>>>>> which
>>>>>> >             generates shard
>>>>>> >              > keys in a way that they are evenly spread among
>>>>>> workers
>>>>>> >             (BEAM-5865).
>>>>>> >              >
>>>>>> >              > Would such extension for FileIO make sense? If yes, I
>>>>>> >             would create a
>>>>>> >              > ticket for it and try to draft a PR.
>>>>>> >              >
>>>>>> >              > Best,
>>>>>> >              > Jozef
>>>>>> >
>>>>>>
>>>>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
Yes, I was able to use it in Flink and I do see performance gain. I also
see, which is important for me, more predictable and uniform memory usage
among workers

On Wed, May 8, 2019 at 7:19 AM Reuven Lax <re...@google.com> wrote:

> So you were able to use this in Flink? Did you see performance gains?
>
> On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jo...@gmail.com> wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Great, let me know when to take another look at the PR!
>>>
>>> Reuven
>>>
>>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> That coder is added extra as a re-map stage from "original" key to new
>>>> ShardAwareKey ... But pipeline might get broken I guess.
>>>> Very fair point. I am having a second thought pass over this and will
>>>> try to simplify it much more
>>>>
>>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>>> running a pipeline today who then tries to update post this PR will have
>>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>>> custom function. Is that correct, or am I misreading things?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hm, what would be the scenario? Have version A running with original
>>>>>> random sharding and then start version B where I change sharding to some
>>>>>> custom function?
>>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>>> state and also work with new keys produced to GBK going forward?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>>> these types of transforms to better support runners that allow in-place
>>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>>> function.
>>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>>
>>>>>>>> If this sort of change looks good, then next step would be to use
>>>>>>>> in in Flink runner transform override. Let me know what do you think
>>>>>>>>
>>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>>> PTransformOverride by runner.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, a hook would have to be added to allow specifying a
>>>>>>>>>> different function for choosing the shard number (I assume the problem is
>>>>>>>>>> that there are cases where the current random assignment is not good?).
>>>>>>>>>> However this can be set using PTransformOverride, we ideally shouldn't
>>>>>>>>>> force the user to know details of the runner when writing their code.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <
>>>>>>>>>> mxm@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>>> number of
>>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>>
>>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>>> number
>>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>>> and that
>>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>>
>>>>>>>>>>> -Max
>>>>>>>>>>>
>>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>>> The
>>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>>> sharding keys.
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>>>> `shardCount`,
>>>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>>>> allow to the
>>>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>>>> key will
>>>>>>>>>>> >     be used to represent it
>>>>>>>>>>> >
>>>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT,
>>>>>>>>>>> ShardKeyT]  extends
>>>>>>>>>>> >     Serializable {
>>>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>>> element,
>>>>>>>>>>> >     shardCount: Integer);
>>>>>>>>>>> >     }
>>>>>>>>>>> >
>>>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>>>> shard
>>>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>>>> >
>>>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>>> relax@google.com
>>>>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>>>> >         "runner-determined sharding." The DataflowRunner
>>>>>>>>>>> already takes
>>>>>>>>>>> >         advantage of this to impose its own sharding if the
>>>>>>>>>>> user hasn't
>>>>>>>>>>> >         specified an explicit one. Could the Flink runner do
>>>>>>>>>>> the same
>>>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>>>> >
>>>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >             Hi Jozef,
>>>>>>>>>>> >
>>>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>>>> options:
>>>>>>>>>>> >
>>>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>>> load
>>>>>>>>>>> >             across workers
>>>>>>>>>>> >             (2) num_shards >> num_workers => good spread of
>>>>>>>>>>> the load
>>>>>>>>>>> >             across workers,
>>>>>>>>>>> >             but huge number of files
>>>>>>>>>>> >
>>>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>>>> sharding
>>>>>>>>>>> >             keys such that
>>>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>>>> >
>>>>>>>>>>> >             I'd like to hear from Beam IO experts if that
>>>>>>>>>>> would make sense.
>>>>>>>>>>> >
>>>>>>>>>>> >             Thanks,
>>>>>>>>>>> >             Max
>>>>>>>>>>> >
>>>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>>> >              > Hello,
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>>>> FileIO,
>>>>>>>>>>> >             there is only one
>>>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>>>> assignment per
>>>>>>>>>>> >             element and it
>>>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>>>> GBK which
>>>>>>>>>>> >             follows.
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> >             provide some
>>>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>>> FileIO.
>>>>>>>>>>> >              > What I am mainly after is, to have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> >             provide optimisation
>>>>>>>>>>> >              > for Flink runtime and pass in a special
>>>>>>>>>>> function which
>>>>>>>>>>> >             generates shard
>>>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>>>> workers
>>>>>>>>>>> >             (BEAM-5865).
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>>>> yes, I
>>>>>>>>>>> >             would create a
>>>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Best,
>>>>>>>>>>> >              > Jozef
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
So you were able to use this in Flink? Did you see performance gains?

On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Sorry, it took a while. I wanted to actually use this extension for
> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
> PR is at https://github.com/apache/beam/pull/8499
>
> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>
>> Great, let me know when to take another look at the PR!
>>
>> Reuven
>>
>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> That coder is added extra as a re-map stage from "original" key to new
>>> ShardAwareKey ... But pipeline might get broken I guess.
>>> Very fair point. I am having a second thought pass over this and will
>>> try to simplify it much more
>>>
>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>> running a pipeline today who then tries to update post this PR will have
>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>> custom function. Is that correct, or am I misreading things?
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hm, what would be the scenario? Have version A running with original
>>>>> random sharding and then start version B where I change sharding to some
>>>>> custom function?
>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>> state and also work with new keys produced to GBK going forward?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>> these types of transforms to better support runners that allow in-place
>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>> function.
>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>
>>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>>
>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>> PTransformOverride by runner.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>>>> know details of the runner when writing their code.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>> number of
>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>
>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>> number
>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>> and that
>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>
>>>>>>>>>> -Max
>>>>>>>>>>
>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>> The
>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>> sharding keys.
>>>>>>>>>> >
>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>>> `shardCount`,
>>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>>> allow to the
>>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>>> key will
>>>>>>>>>> >     be used to represent it
>>>>>>>>>> >
>>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>>> extends
>>>>>>>>>> >     Serializable {
>>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>> element,
>>>>>>>>>> >     shardCount: Integer);
>>>>>>>>>> >     }
>>>>>>>>>> >
>>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>>> shard
>>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>>> >
>>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>> relax@google.com
>>>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>>> >         "runner-determined sharding." The DataflowRunner
>>>>>>>>>> already takes
>>>>>>>>>> >         advantage of this to impose its own sharding if the
>>>>>>>>>> user hasn't
>>>>>>>>>> >         specified an explicit one. Could the Flink runner do
>>>>>>>>>> the same
>>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>>> >
>>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >             Hi Jozef,
>>>>>>>>>> >
>>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>>> options:
>>>>>>>>>> >
>>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>> load
>>>>>>>>>> >             across workers
>>>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>>>> load
>>>>>>>>>> >             across workers,
>>>>>>>>>> >             but huge number of files
>>>>>>>>>> >
>>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>>> sharding
>>>>>>>>>> >             keys such that
>>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>>> >
>>>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>>>> make sense.
>>>>>>>>>> >
>>>>>>>>>> >             Thanks,
>>>>>>>>>> >             Max
>>>>>>>>>> >
>>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>> >              > Hello,
>>>>>>>>>> >              >
>>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>>> FileIO,
>>>>>>>>>> >             there is only one
>>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>>> assignment per
>>>>>>>>>> >             element and it
>>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>>> GBK which
>>>>>>>>>> >             follows.
>>>>>>>>>> >              >
>>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>>> possibility to
>>>>>>>>>> >             provide some
>>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>> FileIO.
>>>>>>>>>> >              > What I am mainly after is, to have a possibility
>>>>>>>>>> to
>>>>>>>>>> >             provide optimisation
>>>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>>>> which
>>>>>>>>>> >             generates shard
>>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>>> workers
>>>>>>>>>> >             (BEAM-5865).
>>>>>>>>>> >              >
>>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>>> yes, I
>>>>>>>>>> >             would create a
>>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>>> >              >
>>>>>>>>>> >              > Best,
>>>>>>>>>> >              > Jozef
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
Sorry, it took a while. I wanted to actually use this extension for
WriteFiles in Flink and see it works and that proved too be a bit bumpy.
PR is at https://github.com/apache/beam/pull/8499

On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:

> Great, let me know when to take another look at the PR!
>
> Reuven
>
> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com> wrote:
>
>> That coder is added extra as a re-map stage from "original" key to new
>> ShardAwareKey ... But pipeline might get broken I guess.
>> Very fair point. I am having a second thought pass over this and will try
>> to simplify it much more
>>
>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I haven't looked at the PR in depth yet, but it appears that someone
>>> running a pipeline today who then tries to update post this PR will have
>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>> custom function. Is that correct, or am I misreading things?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> Hm, what would be the scenario? Have version A running with original
>>>> random sharding and then start version B where I change sharding to some
>>>> custom function?
>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>> state and also work with new keys produced to GBK going forward?
>>>>
>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>> these types of transforms to better support runners that allow in-place
>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>> function.
>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>
>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>
>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>> PTransformOverride by runner.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>>> know details of the runner when writing their code.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>> number of
>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>
>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>> number
>>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>>> that
>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>> sharding keys.
>>>>>>>>> >
>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>> `shardCount`,
>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>> allow to the
>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>> key will
>>>>>>>>> >     be used to represent it
>>>>>>>>> >
>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>> extends
>>>>>>>>> >     Serializable {
>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>>> >     shardCount: Integer);
>>>>>>>>> >     }
>>>>>>>>> >
>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>> shard
>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>> >
>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>>>> takes
>>>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>>>> hasn't
>>>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>>>> same
>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>> >
>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Hi Jozef,
>>>>>>>>> >
>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>> options:
>>>>>>>>> >
>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>> load
>>>>>>>>> >             across workers
>>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>>> load
>>>>>>>>> >             across workers,
>>>>>>>>> >             but huge number of files
>>>>>>>>> >
>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>> sharding
>>>>>>>>> >             keys such that
>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>> >
>>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>>> make sense.
>>>>>>>>> >
>>>>>>>>> >             Thanks,
>>>>>>>>> >             Max
>>>>>>>>> >
>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>> >              > Hello,
>>>>>>>>> >              >
>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>> FileIO,
>>>>>>>>> >             there is only one
>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>> assignment per
>>>>>>>>> >             element and it
>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>> GBK which
>>>>>>>>> >             follows.
>>>>>>>>> >              >
>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>> possibility to
>>>>>>>>> >             provide some
>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>> FileIO.
>>>>>>>>> >              > What I am mainly after is, to have a possibility
>>>>>>>>> to
>>>>>>>>> >             provide optimisation
>>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>>> which
>>>>>>>>> >             generates shard
>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>> workers
>>>>>>>>> >             (BEAM-5865).
>>>>>>>>> >              >
>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>> yes, I
>>>>>>>>> >             would create a
>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>> >              >
>>>>>>>>> >              > Best,
>>>>>>>>> >              > Jozef
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
Great, let me know when to take another look at the PR!

Reuven

On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com> wrote:

> That coder is added extra as a re-map stage from "original" key to new
> ShardAwareKey ... But pipeline might get broken I guess.
> Very fair point. I am having a second thought pass over this and will try
> to simplify it much more
>
> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>
>> I haven't looked at the PR in depth yet, but it appears that someone
>> running a pipeline today who then tries to update post this PR will have
>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>> custom function. Is that correct, or am I misreading things?
>>
>> Reuven
>>
>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hm, what would be the scenario? Have version A running with original
>>> random sharding and then start version B where I change sharding to some
>>> custom function?
>>> So I have to enable the pipeline to digest old keys from GBK restored
>>> state and also work with new keys produced to GBK going forward?
>>>
>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Initial thought on PR: we usually try to limit changing coders in these
>>>> types of transforms to better support runners that allow in-place updates
>>>> of pipelines. Can this be done without changing the coder?
>>>>
>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>> function.
>>>>> https://github.com/apache/beam/pull/8438
>>>>>
>>>>> If this sort of change looks good, then next step would be to use in
>>>>> in Flink runner transform override. Let me know what do you think
>>>>>
>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>> PTransformOverride by runner.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>> know details of the runner when writing their code.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>> number of
>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>
>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>> number
>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>> that
>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>
>>>>>>>> -Max
>>>>>>>>
>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>> sharding keys.
>>>>>>>> >
>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>> `shardCount`,
>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>> >     proposing to be able to pass also a function which will allow
>>>>>>>> to the
>>>>>>>> >     user (or runner) control how is shard determined and what key
>>>>>>>> will
>>>>>>>> >     be used to represent it
>>>>>>>> >
>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>> extends
>>>>>>>> >     Serializable {
>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>> >     shardCount: Integer);
>>>>>>>> >     }
>>>>>>>> >
>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>> shard
>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>> >
>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>> >
>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>>> takes
>>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>>> hasn't
>>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>>> same
>>>>>>>> >         instead of pushing this to the users?
>>>>>>>> >
>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>> >
>>>>>>>> >             Hi Jozef,
>>>>>>>> >
>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>> options:
>>>>>>>> >
>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>> load
>>>>>>>> >             across workers
>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>> load
>>>>>>>> >             across workers,
>>>>>>>> >             but huge number of files
>>>>>>>> >
>>>>>>>> >             Your approach would give users control over the
>>>>>>>> sharding
>>>>>>>> >             keys such that
>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>> >
>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>> make sense.
>>>>>>>> >
>>>>>>>> >             Thanks,
>>>>>>>> >             Max
>>>>>>>> >
>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>> >              > Hello,
>>>>>>>> >              >
>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>> FileIO,
>>>>>>>> >             there is only one
>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>> assignment per
>>>>>>>> >             element and it
>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>> GBK which
>>>>>>>> >             follows.
>>>>>>>> >              >
>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>> possibility to
>>>>>>>> >             provide some
>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>> FileIO.
>>>>>>>> >              > What I am mainly after is, to have a possibility to
>>>>>>>> >             provide optimisation
>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>> which
>>>>>>>> >             generates shard
>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>> workers
>>>>>>>> >             (BEAM-5865).
>>>>>>>> >              >
>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>> yes, I
>>>>>>>> >             would create a
>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>> >              >
>>>>>>>> >              > Best,
>>>>>>>> >              > Jozef
>>>>>>>> >
>>>>>>>>
>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
That coder is added extra as a re-map stage from "original" key to new
ShardAwareKey ... But pipeline might get broken I guess.
Very fair point. I am having a second thought pass over this and will try
to simplify it much more

On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:

> I haven't looked at the PR in depth yet, but it appears that someone
> running a pipeline today who then tries to update post this PR will have
> the coder change to DefaultShardKeyCoder, even if they haven't picked any
> custom function. Is that correct, or am I misreading things?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the pipeline to digest old keys from GBK restored
>> state and also work with new keys produced to GBK going forward?
>>
>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Initial thought on PR: we usually try to limit changing coders in these
>>> types of transforms to better support runners that allow in-place updates
>>> of pipelines. Can this be done without changing the coder?
>>>
>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>> function.
>>>> https://github.com/apache/beam/pull/8438
>>>>
>>>> If this sort of change looks good, then next step would be to use in in
>>>> Flink runner transform override. Let me know what do you think
>>>>
>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>> PTransformOverride by runner.
>>>>>
>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>> are cases where the current random assignment is not good?). However this
>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>> know details of the runner when writing their code.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>>>> of
>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>
>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>> number
>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>> that
>>>>>>> logic is buried inside WriteFiles.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>>>> keys.
>>>>>>> >
>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>> jozo.vilcek@gmail.com
>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>> `shardCount`,
>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>> >     proposing to be able to pass also a function which will allow
>>>>>>> to the
>>>>>>> >     user (or runner) control how is shard determined and what key
>>>>>>> will
>>>>>>> >     be used to represent it
>>>>>>> >
>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>> extends
>>>>>>> >     Serializable {
>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>> >     shardCount: Integer);
>>>>>>> >     }
>>>>>>> >
>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>> shard
>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>> >
>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>> >
>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>> takes
>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>> hasn't
>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>> same
>>>>>>> >         instead of pushing this to the users?
>>>>>>> >
>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>> >
>>>>>>> >             Hi Jozef,
>>>>>>> >
>>>>>>> >             For sharding in FileIO there are basically two options:
>>>>>>> >
>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>>>>> >             across workers
>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>> load
>>>>>>> >             across workers,
>>>>>>> >             but huge number of files
>>>>>>> >
>>>>>>> >             Your approach would give users control over the
>>>>>>> sharding
>>>>>>> >             keys such that
>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>> >
>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>> make sense.
>>>>>>> >
>>>>>>> >             Thanks,
>>>>>>> >             Max
>>>>>>> >
>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>> >              > Hello,
>>>>>>> >              >
>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>> FileIO,
>>>>>>> >             there is only one
>>>>>>> >              > option which is random (round robin) shard
>>>>>>> assignment per
>>>>>>> >             element and it
>>>>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>>>>> which
>>>>>>> >             follows.
>>>>>>> >              >
>>>>>>> >              > I would like to generalize this and have a
>>>>>>> possibility to
>>>>>>> >             provide some
>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>> FileIO.
>>>>>>> >              > What I am mainly after is, to have a possibility to
>>>>>>> >             provide optimisation
>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>> which
>>>>>>> >             generates shard
>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>> workers
>>>>>>> >             (BEAM-5865).
>>>>>>> >              >
>>>>>>> >              > Would such extension for FileIO make sense? If yes,
>>>>>>> I
>>>>>>> >             would create a
>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>> >              >
>>>>>>> >              > Best,
>>>>>>> >              > Jozef
>>>>>>> >
>>>>>>>
>>>>>>