You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Reuven Lax <re...@google.com> on 2019/05/01 12:12:07 UTC
Re: Custom shardingFn for FileIO
I haven't looked at the PR in depth yet, but it appears that someone
running a pipeline today who then tries to update post this PR will have
the coder change to DefaultShardKeyCoder, even if they haven't picked any
custom function. Is that correct, or am I misreading things?
Reuven
On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com> wrote:
> Hm, what would be the scenario? Have version A running with original
> random sharding and then start version B where I change sharding to some
> custom function?
> So I have to enable the pipeline to digest old keys from GBK restored
> state and also work with new keys produced to GBK going forward?
>
> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>
>> Initial thought on PR: we usually try to limit changing coders in these
>> types of transforms to better support runners that allow in-place updates
>> of pipelines. Can this be done without changing the coder?
>>
>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> I have created a PR for enhancing WriteFiles for custom sharding
>>> function.
>>> https://github.com/apache/beam/pull/8438
>>>
>>> If this sort of change looks good, then next step would be to use in in
>>> Flink runner transform override. Let me know what do you think
>>>
>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>> PTransformOverride by runner.
>>>>
>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>> function for choosing the shard number (I assume the problem is that there
>>>>> are cases where the current random assignment is not good?). However this
>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>> know details of the runner when writing their code.
>>>>>
>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>>> of
>>>>>> shards in case of Runner-determined sharding.
>>>>>>
>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>> number
>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>> that
>>>>>> logic is buried inside WriteFiles.
>>>>>>
>>>>>> -Max
>>>>>>
>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>>> keys.
>>>>>> >
>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>>>>> > <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> > Right now, sharding can be specified only via target
>>>>>> `shardCount`,
>>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>>> > proposing to be able to pass also a function which will allow
>>>>>> to the
>>>>>> > user (or runner) control how is shard determined and what key
>>>>>> will
>>>>>> > be used to represent it
>>>>>> >
>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>> extends
>>>>>> > Serializable {
>>>>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>>>>> > shardCount: Integer);
>>>>>> > }
>>>>>> >
>>>>>> > Default implementation can be what is right now => random shard
>>>>>> > encapsulated as ShardedKey<Integer>.
>>>>>> >
>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>> > <ma...@google.com>> wrote:
>>>>>> >
>>>>>> > If sharding is not specified, then the semantics are
>>>>>> > "runner-determined sharding." The DataflowRunner already
>>>>>> takes
>>>>>> > advantage of this to impose its own sharding if the user
>>>>>> hasn't
>>>>>> > specified an explicit one. Could the Flink runner do the
>>>>>> same
>>>>>> > instead of pushing this to the users?
>>>>>> >
>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>> >
>>>>>> > Hi Jozef,
>>>>>> >
>>>>>> > For sharding in FileIO there are basically two options:
>>>>>> >
>>>>>> > (1) num_shards ~= num_workers => bad spread of the load
>>>>>> > across workers
>>>>>> > (2) num_shards >> num_workers => good spread of the load
>>>>>> > across workers,
>>>>>> > but huge number of files
>>>>>> >
>>>>>> > Your approach would give users control over the sharding
>>>>>> > keys such that
>>>>>> > they could be adjusted to spread load more evenly.
>>>>>> >
>>>>>> > I'd like to hear from Beam IO experts if that would
>>>>>> make sense.
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Max
>>>>>> >
>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>> > > Hello,
>>>>>> > >
>>>>>> > > Right now, if someone needs sharded files via FileIO,
>>>>>> > there is only one
>>>>>> > > option which is random (round robin) shard
>>>>>> assignment per
>>>>>> > element and it
>>>>>> > > always use ShardedKey<Integer> as a key for the GBK
>>>>>> which
>>>>>> > follows.
>>>>>> > >
>>>>>> > > I would like to generalize this and have a
>>>>>> possibility to
>>>>>> > provide some
>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>> FileIO.
>>>>>> > > What I am mainly after is, to have a possibility to
>>>>>> > provide optimisation
>>>>>> > > for Flink runtime and pass in a special function
>>>>>> which
>>>>>> > generates shard
>>>>>> > > keys in a way that they are evenly spread among
>>>>>> workers
>>>>>> > (BEAM-5865).
>>>>>> > >
>>>>>> > > Would such extension for FileIO make sense? If yes, I
>>>>>> > would create a
>>>>>> > > ticket for it and try to draft a PR.
>>>>>> > >
>>>>>> > > Best,
>>>>>> > > Jozef
>>>>>> >
>>>>>>
>>>>>
Re: Custom shardingFn for FileIO
Posted by Jozef Vilcek <jo...@gmail.com>.
Yes, I was able to use it in Flink and I do see performance gain. I also
see, which is important for me, more predictable and uniform memory usage
among workers
On Wed, May 8, 2019 at 7:19 AM Reuven Lax <re...@google.com> wrote:
> So you were able to use this in Flink? Did you see performance gains?
>
> On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jo...@gmail.com> wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Great, let me know when to take another look at the PR!
>>>
>>> Reuven
>>>
>>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> That coder is added extra as a re-map stage from "original" key to new
>>>> ShardAwareKey ... But pipeline might get broken I guess.
>>>> Very fair point. I am having a second thought pass over this and will
>>>> try to simplify it much more
>>>>
>>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>>> running a pipeline today who then tries to update post this PR will have
>>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>>> custom function. Is that correct, or am I misreading things?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hm, what would be the scenario? Have version A running with original
>>>>>> random sharding and then start version B where I change sharding to some
>>>>>> custom function?
>>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>>> state and also work with new keys produced to GBK going forward?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>>> these types of transforms to better support runners that allow in-place
>>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>>> function.
>>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>>
>>>>>>>> If this sort of change looks good, then next step would be to use
>>>>>>>> in in Flink runner transform override. Let me know what do you think
>>>>>>>>
>>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>>> PTransformOverride by runner.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, a hook would have to be added to allow specifying a
>>>>>>>>>> different function for choosing the shard number (I assume the problem is
>>>>>>>>>> that there are cases where the current random assignment is not good?).
>>>>>>>>>> However this can be set using PTransformOverride, we ideally shouldn't
>>>>>>>>>> force the user to know details of the runner when writing their code.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <
>>>>>>>>>> mxm@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>>> number of
>>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>>
>>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>>> number
>>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>>> and that
>>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>>
>>>>>>>>>>> -Max
>>>>>>>>>>>
>>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>>> The
>>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>>> sharding keys.
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Right now, sharding can be specified only via target
>>>>>>>>>>> `shardCount`,
>>>>>>>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>>> > proposing to be able to pass also a function which will
>>>>>>>>>>> allow to the
>>>>>>>>>>> > user (or runner) control how is shard determined and what
>>>>>>>>>>> key will
>>>>>>>>>>> > be used to represent it
>>>>>>>>>>> >
>>>>>>>>>>> > interface ShardingFunction[UserT, DestinationT,
>>>>>>>>>>> ShardKeyT] extends
>>>>>>>>>>> > Serializable {
>>>>>>>>>>> > ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>>> element,
>>>>>>>>>>> > shardCount: Integer);
>>>>>>>>>>> > }
>>>>>>>>>>> >
>>>>>>>>>>> > Default implementation can be what is right now => random
>>>>>>>>>>> shard
>>>>>>>>>>> > encapsulated as ShardedKey<Integer>.
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>>> relax@google.com
>>>>>>>>>>> > <ma...@google.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > If sharding is not specified, then the semantics are
>>>>>>>>>>> > "runner-determined sharding." The DataflowRunner
>>>>>>>>>>> already takes
>>>>>>>>>>> > advantage of this to impose its own sharding if the
>>>>>>>>>>> user hasn't
>>>>>>>>>>> > specified an explicit one. Could the Flink runner do
>>>>>>>>>>> the same
>>>>>>>>>>> > instead of pushing this to the users?
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Hi Jozef,
>>>>>>>>>>> >
>>>>>>>>>>> > For sharding in FileIO there are basically two
>>>>>>>>>>> options:
>>>>>>>>>>> >
>>>>>>>>>>> > (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>>> load
>>>>>>>>>>> > across workers
>>>>>>>>>>> > (2) num_shards >> num_workers => good spread of
>>>>>>>>>>> the load
>>>>>>>>>>> > across workers,
>>>>>>>>>>> > but huge number of files
>>>>>>>>>>> >
>>>>>>>>>>> > Your approach would give users control over the
>>>>>>>>>>> sharding
>>>>>>>>>>> > keys such that
>>>>>>>>>>> > they could be adjusted to spread load more evenly.
>>>>>>>>>>> >
>>>>>>>>>>> > I'd like to hear from Beam IO experts if that
>>>>>>>>>>> would make sense.
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks,
>>>>>>>>>>> > Max
>>>>>>>>>>> >
>>>>>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>>> > > Hello,
>>>>>>>>>>> > >
>>>>>>>>>>> > > Right now, if someone needs sharded files via
>>>>>>>>>>> FileIO,
>>>>>>>>>>> > there is only one
>>>>>>>>>>> > > option which is random (round robin) shard
>>>>>>>>>>> assignment per
>>>>>>>>>>> > element and it
>>>>>>>>>>> > > always use ShardedKey<Integer> as a key for the
>>>>>>>>>>> GBK which
>>>>>>>>>>> > follows.
>>>>>>>>>>> > >
>>>>>>>>>>> > > I would like to generalize this and have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> > provide some
>>>>>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>>> FileIO.
>>>>>>>>>>> > > What I am mainly after is, to have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> > provide optimisation
>>>>>>>>>>> > > for Flink runtime and pass in a special
>>>>>>>>>>> function which
>>>>>>>>>>> > generates shard
>>>>>>>>>>> > > keys in a way that they are evenly spread among
>>>>>>>>>>> workers
>>>>>>>>>>> > (BEAM-5865).
>>>>>>>>>>> > >
>>>>>>>>>>> > > Would such extension for FileIO make sense? If
>>>>>>>>>>> yes, I
>>>>>>>>>>> > would create a
>>>>>>>>>>> > > ticket for it and try to draft a PR.
>>>>>>>>>>> > >
>>>>>>>>>>> > > Best,
>>>>>>>>>>> > > Jozef
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>
Re: Custom shardingFn for FileIO
Posted by Reuven Lax <re...@google.com>.
So you were able to use this in Flink? Did you see performance gains?
On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jo...@gmail.com> wrote:
> Sorry, it took a while. I wanted to actually use this extension for
> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
> PR is at https://github.com/apache/beam/pull/8499
>
> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>
>> Great, let me know when to take another look at the PR!
>>
>> Reuven
>>
>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> That coder is added extra as a re-map stage from "original" key to new
>>> ShardAwareKey ... But pipeline might get broken I guess.
>>> Very fair point. I am having a second thought pass over this and will
>>> try to simplify it much more
>>>
>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>> running a pipeline today who then tries to update post this PR will have
>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>> custom function. Is that correct, or am I misreading things?
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hm, what would be the scenario? Have version A running with original
>>>>> random sharding and then start version B where I change sharding to some
>>>>> custom function?
>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>> state and also work with new keys produced to GBK going forward?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>> these types of transforms to better support runners that allow in-place
>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>> function.
>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>
>>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>>
>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>> PTransformOverride by runner.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>>>> know details of the runner when writing their code.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>> number of
>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>
>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>> number
>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>> and that
>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>
>>>>>>>>>> -Max
>>>>>>>>>>
>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>> The
>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>> sharding keys.
>>>>>>>>>> >
>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > Right now, sharding can be specified only via target
>>>>>>>>>> `shardCount`,
>>>>>>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>> > proposing to be able to pass also a function which will
>>>>>>>>>> allow to the
>>>>>>>>>> > user (or runner) control how is shard determined and what
>>>>>>>>>> key will
>>>>>>>>>> > be used to represent it
>>>>>>>>>> >
>>>>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>>> extends
>>>>>>>>>> > Serializable {
>>>>>>>>>> > ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>> element,
>>>>>>>>>> > shardCount: Integer);
>>>>>>>>>> > }
>>>>>>>>>> >
>>>>>>>>>> > Default implementation can be what is right now => random
>>>>>>>>>> shard
>>>>>>>>>> > encapsulated as ShardedKey<Integer>.
>>>>>>>>>> >
>>>>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>> relax@google.com
>>>>>>>>>> > <ma...@google.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > If sharding is not specified, then the semantics are
>>>>>>>>>> > "runner-determined sharding." The DataflowRunner
>>>>>>>>>> already takes
>>>>>>>>>> > advantage of this to impose its own sharding if the
>>>>>>>>>> user hasn't
>>>>>>>>>> > specified an explicit one. Could the Flink runner do
>>>>>>>>>> the same
>>>>>>>>>> > instead of pushing this to the users?
>>>>>>>>>> >
>>>>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > Hi Jozef,
>>>>>>>>>> >
>>>>>>>>>> > For sharding in FileIO there are basically two
>>>>>>>>>> options:
>>>>>>>>>> >
>>>>>>>>>> > (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>> load
>>>>>>>>>> > across workers
>>>>>>>>>> > (2) num_shards >> num_workers => good spread of the
>>>>>>>>>> load
>>>>>>>>>> > across workers,
>>>>>>>>>> > but huge number of files
>>>>>>>>>> >
>>>>>>>>>> > Your approach would give users control over the
>>>>>>>>>> sharding
>>>>>>>>>> > keys such that
>>>>>>>>>> > they could be adjusted to spread load more evenly.
>>>>>>>>>> >
>>>>>>>>>> > I'd like to hear from Beam IO experts if that would
>>>>>>>>>> make sense.
>>>>>>>>>> >
>>>>>>>>>> > Thanks,
>>>>>>>>>> > Max
>>>>>>>>>> >
>>>>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>> > > Hello,
>>>>>>>>>> > >
>>>>>>>>>> > > Right now, if someone needs sharded files via
>>>>>>>>>> FileIO,
>>>>>>>>>> > there is only one
>>>>>>>>>> > > option which is random (round robin) shard
>>>>>>>>>> assignment per
>>>>>>>>>> > element and it
>>>>>>>>>> > > always use ShardedKey<Integer> as a key for the
>>>>>>>>>> GBK which
>>>>>>>>>> > follows.
>>>>>>>>>> > >
>>>>>>>>>> > > I would like to generalize this and have a
>>>>>>>>>> possibility to
>>>>>>>>>> > provide some
>>>>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>> FileIO.
>>>>>>>>>> > > What I am mainly after is, to have a possibility
>>>>>>>>>> to
>>>>>>>>>> > provide optimisation
>>>>>>>>>> > > for Flink runtime and pass in a special function
>>>>>>>>>> which
>>>>>>>>>> > generates shard
>>>>>>>>>> > > keys in a way that they are evenly spread among
>>>>>>>>>> workers
>>>>>>>>>> > (BEAM-5865).
>>>>>>>>>> > >
>>>>>>>>>> > > Would such extension for FileIO make sense? If
>>>>>>>>>> yes, I
>>>>>>>>>> > would create a
>>>>>>>>>> > > ticket for it and try to draft a PR.
>>>>>>>>>> > >
>>>>>>>>>> > > Best,
>>>>>>>>>> > > Jozef
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
Re: Custom shardingFn for FileIO
Posted by Jozef Vilcek <jo...@gmail.com>.
Sorry, it took a while. I wanted to actually use this extension for
WriteFiles in Flink and see it works and that proved too be a bit bumpy.
PR is at https://github.com/apache/beam/pull/8499
On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
> Great, let me know when to take another look at the PR!
>
> Reuven
>
> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com> wrote:
>
>> That coder is added extra as a re-map stage from "original" key to new
>> ShardAwareKey ... But pipeline might get broken I guess.
>> Very fair point. I am having a second thought pass over this and will try
>> to simplify it much more
>>
>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I haven't looked at the PR in depth yet, but it appears that someone
>>> running a pipeline today who then tries to update post this PR will have
>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>> custom function. Is that correct, or am I misreading things?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> Hm, what would be the scenario? Have version A running with original
>>>> random sharding and then start version B where I change sharding to some
>>>> custom function?
>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>> state and also work with new keys produced to GBK going forward?
>>>>
>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>> these types of transforms to better support runners that allow in-place
>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>> function.
>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>
>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>
>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>> PTransformOverride by runner.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>>> know details of the runner when writing their code.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>> number of
>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>
>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>> number
>>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>>> that
>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>> sharding keys.
>>>>>>>>> >
>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> > Right now, sharding can be specified only via target
>>>>>>>>> `shardCount`,
>>>>>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>>>>>> > proposing to be able to pass also a function which will
>>>>>>>>> allow to the
>>>>>>>>> > user (or runner) control how is shard determined and what
>>>>>>>>> key will
>>>>>>>>> > be used to represent it
>>>>>>>>> >
>>>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>> extends
>>>>>>>>> > Serializable {
>>>>>>>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>>> > shardCount: Integer);
>>>>>>>>> > }
>>>>>>>>> >
>>>>>>>>> > Default implementation can be what is right now => random
>>>>>>>>> shard
>>>>>>>>> > encapsulated as ShardedKey<Integer>.
>>>>>>>>> >
>>>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>>>> > <ma...@google.com>> wrote:
>>>>>>>>> >
>>>>>>>>> > If sharding is not specified, then the semantics are
>>>>>>>>> > "runner-determined sharding." The DataflowRunner already
>>>>>>>>> takes
>>>>>>>>> > advantage of this to impose its own sharding if the user
>>>>>>>>> hasn't
>>>>>>>>> > specified an explicit one. Could the Flink runner do the
>>>>>>>>> same
>>>>>>>>> > instead of pushing this to the users?
>>>>>>>>> >
>>>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>> >
>>>>>>>>> > Hi Jozef,
>>>>>>>>> >
>>>>>>>>> > For sharding in FileIO there are basically two
>>>>>>>>> options:
>>>>>>>>> >
>>>>>>>>> > (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>> load
>>>>>>>>> > across workers
>>>>>>>>> > (2) num_shards >> num_workers => good spread of the
>>>>>>>>> load
>>>>>>>>> > across workers,
>>>>>>>>> > but huge number of files
>>>>>>>>> >
>>>>>>>>> > Your approach would give users control over the
>>>>>>>>> sharding
>>>>>>>>> > keys such that
>>>>>>>>> > they could be adjusted to spread load more evenly.
>>>>>>>>> >
>>>>>>>>> > I'd like to hear from Beam IO experts if that would
>>>>>>>>> make sense.
>>>>>>>>> >
>>>>>>>>> > Thanks,
>>>>>>>>> > Max
>>>>>>>>> >
>>>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>> > > Hello,
>>>>>>>>> > >
>>>>>>>>> > > Right now, if someone needs sharded files via
>>>>>>>>> FileIO,
>>>>>>>>> > there is only one
>>>>>>>>> > > option which is random (round robin) shard
>>>>>>>>> assignment per
>>>>>>>>> > element and it
>>>>>>>>> > > always use ShardedKey<Integer> as a key for the
>>>>>>>>> GBK which
>>>>>>>>> > follows.
>>>>>>>>> > >
>>>>>>>>> > > I would like to generalize this and have a
>>>>>>>>> possibility to
>>>>>>>>> > provide some
>>>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>> FileIO.
>>>>>>>>> > > What I am mainly after is, to have a possibility
>>>>>>>>> to
>>>>>>>>> > provide optimisation
>>>>>>>>> > > for Flink runtime and pass in a special function
>>>>>>>>> which
>>>>>>>>> > generates shard
>>>>>>>>> > > keys in a way that they are evenly spread among
>>>>>>>>> workers
>>>>>>>>> > (BEAM-5865).
>>>>>>>>> > >
>>>>>>>>> > > Would such extension for FileIO make sense? If
>>>>>>>>> yes, I
>>>>>>>>> > would create a
>>>>>>>>> > > ticket for it and try to draft a PR.
>>>>>>>>> > >
>>>>>>>>> > > Best,
>>>>>>>>> > > Jozef
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>
Re: Custom shardingFn for FileIO
Posted by Reuven Lax <re...@google.com>.
Great, let me know when to take another look at the PR!
Reuven
On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com> wrote:
> That coder is added extra as a re-map stage from "original" key to new
> ShardAwareKey ... But pipeline might get broken I guess.
> Very fair point. I am having a second thought pass over this and will try
> to simplify it much more
>
> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>
>> I haven't looked at the PR in depth yet, but it appears that someone
>> running a pipeline today who then tries to update post this PR will have
>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>> custom function. Is that correct, or am I misreading things?
>>
>> Reuven
>>
>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hm, what would be the scenario? Have version A running with original
>>> random sharding and then start version B where I change sharding to some
>>> custom function?
>>> So I have to enable the pipeline to digest old keys from GBK restored
>>> state and also work with new keys produced to GBK going forward?
>>>
>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Initial thought on PR: we usually try to limit changing coders in these
>>>> types of transforms to better support runners that allow in-place updates
>>>> of pipelines. Can this be done without changing the coder?
>>>>
>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>> function.
>>>>> https://github.com/apache/beam/pull/8438
>>>>>
>>>>> If this sort of change looks good, then next step would be to use in
>>>>> in Flink runner transform override. Let me know what do you think
>>>>>
>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>> PTransformOverride by runner.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>> know details of the runner when writing their code.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>> number of
>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>
>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>> number
>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>> that
>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>
>>>>>>>> -Max
>>>>>>>>
>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>> sharding keys.
>>>>>>>> >
>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> > Right now, sharding can be specified only via target
>>>>>>>> `shardCount`,
>>>>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>>>>> > proposing to be able to pass also a function which will allow
>>>>>>>> to the
>>>>>>>> > user (or runner) control how is shard determined and what key
>>>>>>>> will
>>>>>>>> > be used to represent it
>>>>>>>> >
>>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>> extends
>>>>>>>> > Serializable {
>>>>>>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>> > shardCount: Integer);
>>>>>>>> > }
>>>>>>>> >
>>>>>>>> > Default implementation can be what is right now => random
>>>>>>>> shard
>>>>>>>> > encapsulated as ShardedKey<Integer>.
>>>>>>>> >
>>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>>> > <ma...@google.com>> wrote:
>>>>>>>> >
>>>>>>>> > If sharding is not specified, then the semantics are
>>>>>>>> > "runner-determined sharding." The DataflowRunner already
>>>>>>>> takes
>>>>>>>> > advantage of this to impose its own sharding if the user
>>>>>>>> hasn't
>>>>>>>> > specified an explicit one. Could the Flink runner do the
>>>>>>>> same
>>>>>>>> > instead of pushing this to the users?
>>>>>>>> >
>>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>> >
>>>>>>>> > Hi Jozef,
>>>>>>>> >
>>>>>>>> > For sharding in FileIO there are basically two
>>>>>>>> options:
>>>>>>>> >
>>>>>>>> > (1) num_shards ~= num_workers => bad spread of the
>>>>>>>> load
>>>>>>>> > across workers
>>>>>>>> > (2) num_shards >> num_workers => good spread of the
>>>>>>>> load
>>>>>>>> > across workers,
>>>>>>>> > but huge number of files
>>>>>>>> >
>>>>>>>> > Your approach would give users control over the
>>>>>>>> sharding
>>>>>>>> > keys such that
>>>>>>>> > they could be adjusted to spread load more evenly.
>>>>>>>> >
>>>>>>>> > I'd like to hear from Beam IO experts if that would
>>>>>>>> make sense.
>>>>>>>> >
>>>>>>>> > Thanks,
>>>>>>>> > Max
>>>>>>>> >
>>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>> > > Hello,
>>>>>>>> > >
>>>>>>>> > > Right now, if someone needs sharded files via
>>>>>>>> FileIO,
>>>>>>>> > there is only one
>>>>>>>> > > option which is random (round robin) shard
>>>>>>>> assignment per
>>>>>>>> > element and it
>>>>>>>> > > always use ShardedKey<Integer> as a key for the
>>>>>>>> GBK which
>>>>>>>> > follows.
>>>>>>>> > >
>>>>>>>> > > I would like to generalize this and have a
>>>>>>>> possibility to
>>>>>>>> > provide some
>>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>> FileIO.
>>>>>>>> > > What I am mainly after is, to have a possibility to
>>>>>>>> > provide optimisation
>>>>>>>> > > for Flink runtime and pass in a special function
>>>>>>>> which
>>>>>>>> > generates shard
>>>>>>>> > > keys in a way that they are evenly spread among
>>>>>>>> workers
>>>>>>>> > (BEAM-5865).
>>>>>>>> > >
>>>>>>>> > > Would such extension for FileIO make sense? If
>>>>>>>> yes, I
>>>>>>>> > would create a
>>>>>>>> > > ticket for it and try to draft a PR.
>>>>>>>> > >
>>>>>>>> > > Best,
>>>>>>>> > > Jozef
>>>>>>>> >
>>>>>>>>
>>>>>>>
Re: Custom shardingFn for FileIO
Posted by Jozef Vilcek <jo...@gmail.com>.
That coder is added extra as a re-map stage from "original" key to new
ShardAwareKey ... But pipeline might get broken I guess.
Very fair point. I am having a second thought pass over this and will try
to simplify it much more
On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
> I haven't looked at the PR in depth yet, but it appears that someone
> running a pipeline today who then tries to update post this PR will have
> the coder change to DefaultShardKeyCoder, even if they haven't picked any
> custom function. Is that correct, or am I misreading things?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the pipeline to digest old keys from GBK restored
>> state and also work with new keys produced to GBK going forward?
>>
>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Initial thought on PR: we usually try to limit changing coders in these
>>> types of transforms to better support runners that allow in-place updates
>>> of pipelines. Can this be done without changing the coder?
>>>
>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>> function.
>>>> https://github.com/apache/beam/pull/8438
>>>>
>>>> If this sort of change looks good, then next step would be to use in in
>>>> Flink runner transform override. Let me know what do you think
>>>>
>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>> PTransformOverride by runner.
>>>>>
>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>> are cases where the current random assignment is not good?). However this
>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>> know details of the runner when writing their code.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>>>> of
>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>
>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>> number
>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>> that
>>>>>>> logic is buried inside WriteFiles.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>>>> keys.
>>>>>>> >
>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>> jozo.vilcek@gmail.com
>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> > Right now, sharding can be specified only via target
>>>>>>> `shardCount`,
>>>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>>>> > proposing to be able to pass also a function which will allow
>>>>>>> to the
>>>>>>> > user (or runner) control how is shard determined and what key
>>>>>>> will
>>>>>>> > be used to represent it
>>>>>>> >
>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>> extends
>>>>>>> > Serializable {
>>>>>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>> > shardCount: Integer);
>>>>>>> > }
>>>>>>> >
>>>>>>> > Default implementation can be what is right now => random
>>>>>>> shard
>>>>>>> > encapsulated as ShardedKey<Integer>.
>>>>>>> >
>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>> > <ma...@google.com>> wrote:
>>>>>>> >
>>>>>>> > If sharding is not specified, then the semantics are
>>>>>>> > "runner-determined sharding." The DataflowRunner already
>>>>>>> takes
>>>>>>> > advantage of this to impose its own sharding if the user
>>>>>>> hasn't
>>>>>>> > specified an explicit one. Could the Flink runner do the
>>>>>>> same
>>>>>>> > instead of pushing this to the users?
>>>>>>> >
>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>> > <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>> >
>>>>>>> > Hi Jozef,
>>>>>>> >
>>>>>>> > For sharding in FileIO there are basically two options:
>>>>>>> >
>>>>>>> > (1) num_shards ~= num_workers => bad spread of the load
>>>>>>> > across workers
>>>>>>> > (2) num_shards >> num_workers => good spread of the
>>>>>>> load
>>>>>>> > across workers,
>>>>>>> > but huge number of files
>>>>>>> >
>>>>>>> > Your approach would give users control over the
>>>>>>> sharding
>>>>>>> > keys such that
>>>>>>> > they could be adjusted to spread load more evenly.
>>>>>>> >
>>>>>>> > I'd like to hear from Beam IO experts if that would
>>>>>>> make sense.
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Max
>>>>>>> >
>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>> > > Hello,
>>>>>>> > >
>>>>>>> > > Right now, if someone needs sharded files via
>>>>>>> FileIO,
>>>>>>> > there is only one
>>>>>>> > > option which is random (round robin) shard
>>>>>>> assignment per
>>>>>>> > element and it
>>>>>>> > > always use ShardedKey<Integer> as a key for the GBK
>>>>>>> which
>>>>>>> > follows.
>>>>>>> > >
>>>>>>> > > I would like to generalize this and have a
>>>>>>> possibility to
>>>>>>> > provide some
>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>> FileIO.
>>>>>>> > > What I am mainly after is, to have a possibility to
>>>>>>> > provide optimisation
>>>>>>> > > for Flink runtime and pass in a special function
>>>>>>> which
>>>>>>> > generates shard
>>>>>>> > > keys in a way that they are evenly spread among
>>>>>>> workers
>>>>>>> > (BEAM-5865).
>>>>>>> > >
>>>>>>> > > Would such extension for FileIO make sense? If yes,
>>>>>>> I
>>>>>>> > would create a
>>>>>>> > > ticket for it and try to draft a PR.
>>>>>>> > >
>>>>>>> > > Best,
>>>>>>> > > Jozef
>>>>>>> >
>>>>>>>
>>>>>>