You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2019/04/25 06:52:49 UTC

Custom shardingFn for FileIO

Hello,

Right now, if someone needs sharded files via FileIO, there is only one
option which is random (round robin) shard assignment per element and it
always use ShardedKey<Integer> as a key for the GBK which follows.

I would like to generalize this and have a possibility to provide some
ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
What I am mainly after is, to have a possibility to provide optimisation
for Flink runtime and pass in a special function which generates shard keys
in a way that they are evenly spread among workers (BEAM-5865).

Would such extension for FileIO make sense? If yes, I would create a ticket
for it and try to draft a PR.

Best,
Jozef

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
Yes, I was able to use it in Flink and I do see performance gain. I also
see, which is important for me, more predictable and uniform memory usage
among workers

On Wed, May 8, 2019 at 7:19 AM Reuven Lax <re...@google.com> wrote:

> So you were able to use this in Flink? Did you see performance gains?
>
> On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jo...@gmail.com> wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Great, let me know when to take another look at the PR!
>>>
>>> Reuven
>>>
>>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> That coder is added extra as a re-map stage from "original" key to new
>>>> ShardAwareKey ... But pipeline might get broken I guess.
>>>> Very fair point. I am having a second thought pass over this and will
>>>> try to simplify it much more
>>>>
>>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>>> running a pipeline today who then tries to update post this PR will have
>>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>>> custom function. Is that correct, or am I misreading things?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hm, what would be the scenario? Have version A running with original
>>>>>> random sharding and then start version B where I change sharding to some
>>>>>> custom function?
>>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>>> state and also work with new keys produced to GBK going forward?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>>> these types of transforms to better support runners that allow in-place
>>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>>> function.
>>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>>
>>>>>>>> If this sort of change looks good, then next step would be to use
>>>>>>>> in in Flink runner transform override. Let me know what do you think
>>>>>>>>
>>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>>> PTransformOverride by runner.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, a hook would have to be added to allow specifying a
>>>>>>>>>> different function for choosing the shard number (I assume the problem is
>>>>>>>>>> that there are cases where the current random assignment is not good?).
>>>>>>>>>> However this can be set using PTransformOverride, we ideally shouldn't
>>>>>>>>>> force the user to know details of the runner when writing their code.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <
>>>>>>>>>> mxm@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>>> number of
>>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>>
>>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>>> number
>>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>>> and that
>>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>>
>>>>>>>>>>> -Max
>>>>>>>>>>>
>>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>>> The
>>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>>> sharding keys.
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>>>> `shardCount`,
>>>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>>>> allow to the
>>>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>>>> key will
>>>>>>>>>>> >     be used to represent it
>>>>>>>>>>> >
>>>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT,
>>>>>>>>>>> ShardKeyT]  extends
>>>>>>>>>>> >     Serializable {
>>>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>>> element,
>>>>>>>>>>> >     shardCount: Integer);
>>>>>>>>>>> >     }
>>>>>>>>>>> >
>>>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>>>> shard
>>>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>>>> >
>>>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>>> relax@google.com
>>>>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>>>> >         "runner-determined sharding." The DataflowRunner
>>>>>>>>>>> already takes
>>>>>>>>>>> >         advantage of this to impose its own sharding if the
>>>>>>>>>>> user hasn't
>>>>>>>>>>> >         specified an explicit one. Could the Flink runner do
>>>>>>>>>>> the same
>>>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>>>> >
>>>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >             Hi Jozef,
>>>>>>>>>>> >
>>>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>>>> options:
>>>>>>>>>>> >
>>>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>>> load
>>>>>>>>>>> >             across workers
>>>>>>>>>>> >             (2) num_shards >> num_workers => good spread of
>>>>>>>>>>> the load
>>>>>>>>>>> >             across workers,
>>>>>>>>>>> >             but huge number of files
>>>>>>>>>>> >
>>>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>>>> sharding
>>>>>>>>>>> >             keys such that
>>>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>>>> >
>>>>>>>>>>> >             I'd like to hear from Beam IO experts if that
>>>>>>>>>>> would make sense.
>>>>>>>>>>> >
>>>>>>>>>>> >             Thanks,
>>>>>>>>>>> >             Max
>>>>>>>>>>> >
>>>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>>> >              > Hello,
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>>>> FileIO,
>>>>>>>>>>> >             there is only one
>>>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>>>> assignment per
>>>>>>>>>>> >             element and it
>>>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>>>> GBK which
>>>>>>>>>>> >             follows.
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> >             provide some
>>>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>>> FileIO.
>>>>>>>>>>> >              > What I am mainly after is, to have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> >             provide optimisation
>>>>>>>>>>> >              > for Flink runtime and pass in a special
>>>>>>>>>>> function which
>>>>>>>>>>> >             generates shard
>>>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>>>> workers
>>>>>>>>>>> >             (BEAM-5865).
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>>>> yes, I
>>>>>>>>>>> >             would create a
>>>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Best,
>>>>>>>>>>> >              > Jozef
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
So you were able to use this in Flink? Did you see performance gains?

On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Sorry, it took a while. I wanted to actually use this extension for
> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
> PR is at https://github.com/apache/beam/pull/8499
>
> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>
>> Great, let me know when to take another look at the PR!
>>
>> Reuven
>>
>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> That coder is added extra as a re-map stage from "original" key to new
>>> ShardAwareKey ... But pipeline might get broken I guess.
>>> Very fair point. I am having a second thought pass over this and will
>>> try to simplify it much more
>>>
>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>> running a pipeline today who then tries to update post this PR will have
>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>> custom function. Is that correct, or am I misreading things?
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hm, what would be the scenario? Have version A running with original
>>>>> random sharding and then start version B where I change sharding to some
>>>>> custom function?
>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>> state and also work with new keys produced to GBK going forward?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>> these types of transforms to better support runners that allow in-place
>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>> function.
>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>
>>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>>
>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>> PTransformOverride by runner.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>>>> know details of the runner when writing their code.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>> number of
>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>
>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>> number
>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>> and that
>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>
>>>>>>>>>> -Max
>>>>>>>>>>
>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>> The
>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>> sharding keys.
>>>>>>>>>> >
>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>>> `shardCount`,
>>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>>> allow to the
>>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>>> key will
>>>>>>>>>> >     be used to represent it
>>>>>>>>>> >
>>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>>> extends
>>>>>>>>>> >     Serializable {
>>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>> element,
>>>>>>>>>> >     shardCount: Integer);
>>>>>>>>>> >     }
>>>>>>>>>> >
>>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>>> shard
>>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>>> >
>>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>> relax@google.com
>>>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>>> >         "runner-determined sharding." The DataflowRunner
>>>>>>>>>> already takes
>>>>>>>>>> >         advantage of this to impose its own sharding if the
>>>>>>>>>> user hasn't
>>>>>>>>>> >         specified an explicit one. Could the Flink runner do
>>>>>>>>>> the same
>>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>>> >
>>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >             Hi Jozef,
>>>>>>>>>> >
>>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>>> options:
>>>>>>>>>> >
>>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>> load
>>>>>>>>>> >             across workers
>>>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>>>> load
>>>>>>>>>> >             across workers,
>>>>>>>>>> >             but huge number of files
>>>>>>>>>> >
>>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>>> sharding
>>>>>>>>>> >             keys such that
>>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>>> >
>>>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>>>> make sense.
>>>>>>>>>> >
>>>>>>>>>> >             Thanks,
>>>>>>>>>> >             Max
>>>>>>>>>> >
>>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>> >              > Hello,
>>>>>>>>>> >              >
>>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>>> FileIO,
>>>>>>>>>> >             there is only one
>>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>>> assignment per
>>>>>>>>>> >             element and it
>>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>>> GBK which
>>>>>>>>>> >             follows.
>>>>>>>>>> >              >
>>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>>> possibility to
>>>>>>>>>> >             provide some
>>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>> FileIO.
>>>>>>>>>> >              > What I am mainly after is, to have a possibility
>>>>>>>>>> to
>>>>>>>>>> >             provide optimisation
>>>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>>>> which
>>>>>>>>>> >             generates shard
>>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>>> workers
>>>>>>>>>> >             (BEAM-5865).
>>>>>>>>>> >              >
>>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>>> yes, I
>>>>>>>>>> >             would create a
>>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>>> >              >
>>>>>>>>>> >              > Best,
>>>>>>>>>> >              > Jozef
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
Sorry, it took a while. I wanted to actually use this extension for
WriteFiles in Flink and see it works and that proved too be a bit bumpy.
PR is at https://github.com/apache/beam/pull/8499

On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:

> Great, let me know when to take another look at the PR!
>
> Reuven
>
> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com> wrote:
>
>> That coder is added extra as a re-map stage from "original" key to new
>> ShardAwareKey ... But pipeline might get broken I guess.
>> Very fair point. I am having a second thought pass over this and will try
>> to simplify it much more
>>
>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I haven't looked at the PR in depth yet, but it appears that someone
>>> running a pipeline today who then tries to update post this PR will have
>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>> custom function. Is that correct, or am I misreading things?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> Hm, what would be the scenario? Have version A running with original
>>>> random sharding and then start version B where I change sharding to some
>>>> custom function?
>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>> state and also work with new keys produced to GBK going forward?
>>>>
>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>> these types of transforms to better support runners that allow in-place
>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>> function.
>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>
>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>
>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>> PTransformOverride by runner.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>>> know details of the runner when writing their code.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>> number of
>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>
>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>> number
>>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>>> that
>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>> sharding keys.
>>>>>>>>> >
>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>> `shardCount`,
>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>> allow to the
>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>> key will
>>>>>>>>> >     be used to represent it
>>>>>>>>> >
>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>> extends
>>>>>>>>> >     Serializable {
>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>>> >     shardCount: Integer);
>>>>>>>>> >     }
>>>>>>>>> >
>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>> shard
>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>> >
>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>>>> takes
>>>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>>>> hasn't
>>>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>>>> same
>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>> >
>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Hi Jozef,
>>>>>>>>> >
>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>> options:
>>>>>>>>> >
>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>> load
>>>>>>>>> >             across workers
>>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>>> load
>>>>>>>>> >             across workers,
>>>>>>>>> >             but huge number of files
>>>>>>>>> >
>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>> sharding
>>>>>>>>> >             keys such that
>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>> >
>>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>>> make sense.
>>>>>>>>> >
>>>>>>>>> >             Thanks,
>>>>>>>>> >             Max
>>>>>>>>> >
>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>> >              > Hello,
>>>>>>>>> >              >
>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>> FileIO,
>>>>>>>>> >             there is only one
>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>> assignment per
>>>>>>>>> >             element and it
>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>> GBK which
>>>>>>>>> >             follows.
>>>>>>>>> >              >
>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>> possibility to
>>>>>>>>> >             provide some
>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>> FileIO.
>>>>>>>>> >              > What I am mainly after is, to have a possibility
>>>>>>>>> to
>>>>>>>>> >             provide optimisation
>>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>>> which
>>>>>>>>> >             generates shard
>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>> workers
>>>>>>>>> >             (BEAM-5865).
>>>>>>>>> >              >
>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>> yes, I
>>>>>>>>> >             would create a
>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>> >              >
>>>>>>>>> >              > Best,
>>>>>>>>> >              > Jozef
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
Great, let me know when to take another look at the PR!

Reuven

On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jo...@gmail.com> wrote:

> That coder is added extra as a re-map stage from "original" key to new
> ShardAwareKey ... But pipeline might get broken I guess.
> Very fair point. I am having a second thought pass over this and will try
> to simplify it much more
>
> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>
>> I haven't looked at the PR in depth yet, but it appears that someone
>> running a pipeline today who then tries to update post this PR will have
>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>> custom function. Is that correct, or am I misreading things?
>>
>> Reuven
>>
>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hm, what would be the scenario? Have version A running with original
>>> random sharding and then start version B where I change sharding to some
>>> custom function?
>>> So I have to enable the pipeline to digest old keys from GBK restored
>>> state and also work with new keys produced to GBK going forward?
>>>
>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Initial thought on PR: we usually try to limit changing coders in these
>>>> types of transforms to better support runners that allow in-place updates
>>>> of pipelines. Can this be done without changing the coder?
>>>>
>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>> function.
>>>>> https://github.com/apache/beam/pull/8438
>>>>>
>>>>> If this sort of change looks good, then next step would be to use in
>>>>> in Flink runner transform override. Let me know what do you think
>>>>>
>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>> PTransformOverride by runner.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>>> are cases where the current random assignment is not good?). However this
>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>>> know details of the runner when writing their code.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>> number of
>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>
>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>> number
>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>> that
>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>
>>>>>>>> -Max
>>>>>>>>
>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>> sharding keys.
>>>>>>>> >
>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>> jozo.vilcek@gmail.com
>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>> `shardCount`,
>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>> >     proposing to be able to pass also a function which will allow
>>>>>>>> to the
>>>>>>>> >     user (or runner) control how is shard determined and what key
>>>>>>>> will
>>>>>>>> >     be used to represent it
>>>>>>>> >
>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>> extends
>>>>>>>> >     Serializable {
>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>> >     shardCount: Integer);
>>>>>>>> >     }
>>>>>>>> >
>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>> shard
>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>> >
>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>>> >
>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>>> takes
>>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>>> hasn't
>>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>>> same
>>>>>>>> >         instead of pushing this to the users?
>>>>>>>> >
>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>>> >
>>>>>>>> >             Hi Jozef,
>>>>>>>> >
>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>> options:
>>>>>>>> >
>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>> load
>>>>>>>> >             across workers
>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>> load
>>>>>>>> >             across workers,
>>>>>>>> >             but huge number of files
>>>>>>>> >
>>>>>>>> >             Your approach would give users control over the
>>>>>>>> sharding
>>>>>>>> >             keys such that
>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>> >
>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>> make sense.
>>>>>>>> >
>>>>>>>> >             Thanks,
>>>>>>>> >             Max
>>>>>>>> >
>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>> >              > Hello,
>>>>>>>> >              >
>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>> FileIO,
>>>>>>>> >             there is only one
>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>> assignment per
>>>>>>>> >             element and it
>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>> GBK which
>>>>>>>> >             follows.
>>>>>>>> >              >
>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>> possibility to
>>>>>>>> >             provide some
>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>> FileIO.
>>>>>>>> >              > What I am mainly after is, to have a possibility to
>>>>>>>> >             provide optimisation
>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>> which
>>>>>>>> >             generates shard
>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>> workers
>>>>>>>> >             (BEAM-5865).
>>>>>>>> >              >
>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>> yes, I
>>>>>>>> >             would create a
>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>> >              >
>>>>>>>> >              > Best,
>>>>>>>> >              > Jozef
>>>>>>>> >
>>>>>>>>
>>>>>>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
That coder is added extra as a re-map stage from "original" key to new
ShardAwareKey ... But pipeline might get broken I guess.
Very fair point. I am having a second thought pass over this and will try
to simplify it much more

On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:

> I haven't looked at the PR in depth yet, but it appears that someone
> running a pipeline today who then tries to update post this PR will have
> the coder change to DefaultShardKeyCoder, even if they haven't picked any
> custom function. Is that correct, or am I misreading things?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the pipeline to digest old keys from GBK restored
>> state and also work with new keys produced to GBK going forward?
>>
>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Initial thought on PR: we usually try to limit changing coders in these
>>> types of transforms to better support runners that allow in-place updates
>>> of pipelines. Can this be done without changing the coder?
>>>
>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>> function.
>>>> https://github.com/apache/beam/pull/8438
>>>>
>>>> If this sort of change looks good, then next step would be to use in in
>>>> Flink runner transform override. Let me know what do you think
>>>>
>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>> PTransformOverride by runner.
>>>>>
>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>> function for choosing the shard number (I assume the problem is that there
>>>>>> are cases where the current random assignment is not good?). However this
>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>>> know details of the runner when writing their code.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>>>> of
>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>
>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>> number
>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>> that
>>>>>>> logic is buried inside WriteFiles.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>>>> keys.
>>>>>>> >
>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>> jozo.vilcek@gmail.com
>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>> `shardCount`,
>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>> >     proposing to be able to pass also a function which will allow
>>>>>>> to the
>>>>>>> >     user (or runner) control how is shard determined and what key
>>>>>>> will
>>>>>>> >     be used to represent it
>>>>>>> >
>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>> extends
>>>>>>> >     Serializable {
>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>> >     shardCount: Integer);
>>>>>>> >     }
>>>>>>> >
>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>> shard
>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>> >
>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>> >
>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>> takes
>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>> hasn't
>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>> same
>>>>>>> >         instead of pushing this to the users?
>>>>>>> >
>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>>> >
>>>>>>> >             Hi Jozef,
>>>>>>> >
>>>>>>> >             For sharding in FileIO there are basically two options:
>>>>>>> >
>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>>>>> >             across workers
>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>> load
>>>>>>> >             across workers,
>>>>>>> >             but huge number of files
>>>>>>> >
>>>>>>> >             Your approach would give users control over the
>>>>>>> sharding
>>>>>>> >             keys such that
>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>> >
>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>> make sense.
>>>>>>> >
>>>>>>> >             Thanks,
>>>>>>> >             Max
>>>>>>> >
>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>> >              > Hello,
>>>>>>> >              >
>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>> FileIO,
>>>>>>> >             there is only one
>>>>>>> >              > option which is random (round robin) shard
>>>>>>> assignment per
>>>>>>> >             element and it
>>>>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>>>>> which
>>>>>>> >             follows.
>>>>>>> >              >
>>>>>>> >              > I would like to generalize this and have a
>>>>>>> possibility to
>>>>>>> >             provide some
>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>> FileIO.
>>>>>>> >              > What I am mainly after is, to have a possibility to
>>>>>>> >             provide optimisation
>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>> which
>>>>>>> >             generates shard
>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>> workers
>>>>>>> >             (BEAM-5865).
>>>>>>> >              >
>>>>>>> >              > Would such extension for FileIO make sense? If yes,
>>>>>>> I
>>>>>>> >             would create a
>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>> >              >
>>>>>>> >              > Best,
>>>>>>> >              > Jozef
>>>>>>> >
>>>>>>>
>>>>>>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
I haven't looked at the PR in depth yet, but it appears that someone
running a pipeline today who then tries to update post this PR will have
the coder change to DefaultShardKeyCoder, even if they haven't picked any
custom function. Is that correct, or am I misreading things?

Reuven

On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Hm, what would be the scenario? Have version A running with original
> random sharding and then start version B where I change sharding to some
> custom function?
> So I have to enable the pipeline to digest old keys from GBK restored
> state and also work with new keys produced to GBK going forward?
>
> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>
>> Initial thought on PR: we usually try to limit changing coders in these
>> types of transforms to better support runners that allow in-place updates
>> of pipelines. Can this be done without changing the coder?
>>
>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> I have created a PR for enhancing WriteFiles for custom sharding
>>> function.
>>> https://github.com/apache/beam/pull/8438
>>>
>>> If this sort of change looks good, then next step would be to use in in
>>> Flink runner transform override. Let me know what do you think
>>>
>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>> PTransformOverride by runner.
>>>>
>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>> function for choosing the shard number (I assume the problem is that there
>>>>> are cases where the current random assignment is not good?). However this
>>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>>> know details of the runner when writing their code.
>>>>>
>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>>> of
>>>>>> shards in case of Runner-determined sharding.
>>>>>>
>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>> number
>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>> that
>>>>>> logic is buried inside WriteFiles.
>>>>>>
>>>>>> -Max
>>>>>>
>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>>> keys.
>>>>>> >
>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>>>>> > <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> >     Right now, sharding can be specified only via target
>>>>>> `shardCount`,
>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>> >     proposing to be able to pass also a function which will allow
>>>>>> to the
>>>>>> >     user (or runner) control how is shard determined and what key
>>>>>> will
>>>>>> >     be used to represent it
>>>>>> >
>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>> extends
>>>>>> >     Serializable {
>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>> >     shardCount: Integer);
>>>>>> >     }
>>>>>> >
>>>>>> >     Default implementation can be what is right now =>  random shard
>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>> >
>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>>> >     <ma...@google.com>> wrote:
>>>>>> >
>>>>>> >         If sharding is not specified, then the semantics are
>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>> takes
>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>> hasn't
>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>> same
>>>>>> >         instead of pushing this to the users?
>>>>>> >
>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>>> >
>>>>>> >             Hi Jozef,
>>>>>> >
>>>>>> >             For sharding in FileIO there are basically two options:
>>>>>> >
>>>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>>>> >             across workers
>>>>>> >             (2) num_shards >> num_workers => good spread of the load
>>>>>> >             across workers,
>>>>>> >             but huge number of files
>>>>>> >
>>>>>> >             Your approach would give users control over the sharding
>>>>>> >             keys such that
>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>> >
>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>> make sense.
>>>>>> >
>>>>>> >             Thanks,
>>>>>> >             Max
>>>>>> >
>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>> >              > Hello,
>>>>>> >              >
>>>>>> >              > Right now, if someone needs sharded files via FileIO,
>>>>>> >             there is only one
>>>>>> >              > option which is random (round robin) shard
>>>>>> assignment per
>>>>>> >             element and it
>>>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>>>> which
>>>>>> >             follows.
>>>>>> >              >
>>>>>> >              > I would like to generalize this and have a
>>>>>> possibility to
>>>>>> >             provide some
>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>> FileIO.
>>>>>> >              > What I am mainly after is, to have a possibility to
>>>>>> >             provide optimisation
>>>>>> >              > for Flink runtime and pass in a special function
>>>>>> which
>>>>>> >             generates shard
>>>>>> >              > keys in a way that they are evenly spread among
>>>>>> workers
>>>>>> >             (BEAM-5865).
>>>>>> >              >
>>>>>> >              > Would such extension for FileIO make sense? If yes, I
>>>>>> >             would create a
>>>>>> >              > ticket for it and try to draft a PR.
>>>>>> >              >
>>>>>> >              > Best,
>>>>>> >              > Jozef
>>>>>> >
>>>>>>
>>>>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
Hm, what would be the scenario? Have version A running with original random
sharding and then start version B where I change sharding to some custom
function?
So I have to enable the pipeline to digest old keys from GBK restored state
and also work with new keys produced to GBK going forward?

On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:

> Initial thought on PR: we usually try to limit changing coders in these
> types of transforms to better support runners that allow in-place updates
> of pipelines. Can this be done without changing the coder?
>
> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> I have created a PR for enhancing WriteFiles for custom sharding function.
>> https://github.com/apache/beam/pull/8438
>>
>> If this sort of change looks good, then next step would be to use in in
>> Flink runner transform override. Let me know what do you think
>>
>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> I guess it is fine to enable shardingFn control only on WriteFiles level
>>> rather than FileIO. On WriteFiles it can be manipulated in
>>> PTransformOverride by runner.
>>>
>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Yes, a hook would have to be added to allow specifying a different
>>>> function for choosing the shard number (I assume the problem is that there
>>>> are cases where the current random assignment is not good?). However this
>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>> know details of the runner when writing their code.
>>>>
>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>>
>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>> of
>>>>> shards in case of Runner-determined sharding.
>>>>>
>>>>> Not sure if that would work for Jozef's case because setting the
>>>>> number
>>>>> of shards is not enough. We want to set the shard key directly and
>>>>> that
>>>>> logic is buried inside WriteFiles.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>> keys.
>>>>> >
>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>>>> > <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >     Right now, sharding can be specified only via target
>>>>> `shardCount`,
>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>> >     proposing to be able to pass also a function which will allow to
>>>>> the
>>>>> >     user (or runner) control how is shard determined and what key
>>>>> will
>>>>> >     be used to represent it
>>>>> >
>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>> extends
>>>>> >     Serializable {
>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>> >     shardCount: Integer);
>>>>> >     }
>>>>> >
>>>>> >     Default implementation can be what is right now =>  random shard
>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>> >
>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>>> >     <ma...@google.com>> wrote:
>>>>> >
>>>>> >         If sharding is not specified, then the semantics are
>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>> takes
>>>>> >         advantage of this to impose its own sharding if the user
>>>>> hasn't
>>>>> >         specified an explicit one. Could the Flink runner do the same
>>>>> >         instead of pushing this to the users?
>>>>> >
>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>>> >
>>>>> >             Hi Jozef,
>>>>> >
>>>>> >             For sharding in FileIO there are basically two options:
>>>>> >
>>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>>> >             across workers
>>>>> >             (2) num_shards >> num_workers => good spread of the load
>>>>> >             across workers,
>>>>> >             but huge number of files
>>>>> >
>>>>> >             Your approach would give users control over the sharding
>>>>> >             keys such that
>>>>> >             they could be adjusted to spread load more evenly.
>>>>> >
>>>>> >             I'd like to hear from Beam IO experts if that would make
>>>>> sense.
>>>>> >
>>>>> >             Thanks,
>>>>> >             Max
>>>>> >
>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>> >              > Hello,
>>>>> >              >
>>>>> >              > Right now, if someone needs sharded files via FileIO,
>>>>> >             there is only one
>>>>> >              > option which is random (round robin) shard assignment
>>>>> per
>>>>> >             element and it
>>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>>> which
>>>>> >             follows.
>>>>> >              >
>>>>> >              > I would like to generalize this and have a
>>>>> possibility to
>>>>> >             provide some
>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>>>> >              > What I am mainly after is, to have a possibility to
>>>>> >             provide optimisation
>>>>> >              > for Flink runtime and pass in a special function which
>>>>> >             generates shard
>>>>> >              > keys in a way that they are evenly spread among
>>>>> workers
>>>>> >             (BEAM-5865).
>>>>> >              >
>>>>> >              > Would such extension for FileIO make sense? If yes, I
>>>>> >             would create a
>>>>> >              > ticket for it and try to draft a PR.
>>>>> >              >
>>>>> >              > Best,
>>>>> >              > Jozef
>>>>> >
>>>>>
>>>>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
Initial thought on PR: we usually try to limit changing coders in these
types of transforms to better support runners that allow in-place updates
of pipelines. Can this be done without changing the coder?

On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jo...@gmail.com> wrote:

> I have created a PR for enhancing WriteFiles for custom sharding function.
> https://github.com/apache/beam/pull/8438
>
> If this sort of change looks good, then next step would be to use in in
> Flink runner transform override. Let me know what do you think
>
> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> I guess it is fine to enable shardingFn control only on WriteFiles level
>> rather than FileIO. On WriteFiles it can be manipulated in
>> PTransformOverride by runner.
>>
>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Yes, a hook would have to be added to allow specifying a different
>>> function for choosing the shard number (I assume the problem is that there
>>> are cases where the current random assignment is not good?). However this
>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>> know details of the runner when writing their code.
>>>
>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Reuven is talking about PTransformOverride, e.g.
>>>> FlinkTransformOverrides. We already use this to determine the number of
>>>> shards in case of Runner-determined sharding.
>>>>
>>>> Not sure if that would work for Jozef's case because setting the number
>>>> of shards is not enough. We want to set the shard key directly and that
>>>> logic is buried inside WriteFiles.
>>>>
>>>> -Max
>>>>
>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>> > Actually the runner is free to perform surgery on the graph. The
>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>> keys.
>>>> >
>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>>> > <ma...@gmail.com>> wrote:
>>>> >
>>>> >     Right now, sharding can be specified only via target `shardCount`,
>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>> >     proposing to be able to pass also a function which will allow to
>>>> the
>>>> >     user (or runner) control how is shard determined and what key will
>>>> >     be used to represent it
>>>> >
>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>> extends
>>>> >     Serializable {
>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>> >     shardCount: Integer);
>>>> >     }
>>>> >
>>>> >     Default implementation can be what is right now =>  random shard
>>>> >     encapsulated as ShardedKey<Integer>.
>>>> >
>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>>> >     <ma...@google.com>> wrote:
>>>> >
>>>> >         If sharding is not specified, then the semantics are
>>>> >         "runner-determined sharding." The DataflowRunner already takes
>>>> >         advantage of this to impose its own sharding if the user
>>>> hasn't
>>>> >         specified an explicit one. Could the Flink runner do the same
>>>> >         instead of pushing this to the users?
>>>> >
>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>>> >
>>>> >             Hi Jozef,
>>>> >
>>>> >             For sharding in FileIO there are basically two options:
>>>> >
>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>> >             across workers
>>>> >             (2) num_shards >> num_workers => good spread of the load
>>>> >             across workers,
>>>> >             but huge number of files
>>>> >
>>>> >             Your approach would give users control over the sharding
>>>> >             keys such that
>>>> >             they could be adjusted to spread load more evenly.
>>>> >
>>>> >             I'd like to hear from Beam IO experts if that would make
>>>> sense.
>>>> >
>>>> >             Thanks,
>>>> >             Max
>>>> >
>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>> >              > Hello,
>>>> >              >
>>>> >              > Right now, if someone needs sharded files via FileIO,
>>>> >             there is only one
>>>> >              > option which is random (round robin) shard assignment
>>>> per
>>>> >             element and it
>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>> which
>>>> >             follows.
>>>> >              >
>>>> >              > I would like to generalize this and have a possibility
>>>> to
>>>> >             provide some
>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>>> >              > What I am mainly after is, to have a possibility to
>>>> >             provide optimisation
>>>> >              > for Flink runtime and pass in a special function which
>>>> >             generates shard
>>>> >              > keys in a way that they are evenly spread among workers
>>>> >             (BEAM-5865).
>>>> >              >
>>>> >              > Would such extension for FileIO make sense? If yes, I
>>>> >             would create a
>>>> >              > ticket for it and try to draft a PR.
>>>> >              >
>>>> >              > Best,
>>>> >              > Jozef
>>>> >
>>>>
>>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
I have created a PR for enhancing WriteFiles for custom sharding function.
https://github.com/apache/beam/pull/8438

If this sort of change looks good, then next step would be to use in in
Flink runner transform override. Let me know what do you think

On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jo...@gmail.com> wrote:

> I guess it is fine to enable shardingFn control only on WriteFiles level
> rather than FileIO. On WriteFiles it can be manipulated in
> PTransformOverride by runner.
>
> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>
>> Yes, a hook would have to be added to allow specifying a different
>> function for choosing the shard number (I assume the problem is that there
>> are cases where the current random assignment is not good?). However this
>> can be set using PTransformOverride, we ideally shouldn't force the user to
>> know details of the runner when writing their code.
>>
>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Reuven is talking about PTransformOverride, e.g.
>>> FlinkTransformOverrides. We already use this to determine the number of
>>> shards in case of Runner-determined sharding.
>>>
>>> Not sure if that would work for Jozef's case because setting the number
>>> of shards is not enough. We want to set the shard key directly and that
>>> logic is buried inside WriteFiles.
>>>
>>> -Max
>>>
>>> On 25.04.19 16:30, Reuven Lax wrote:
>>> > Actually the runner is free to perform surgery on the graph. The
>>> > FlinkRunner can insert a custom function to determine the sharding
>>> keys.
>>> >
>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>> > <ma...@gmail.com>> wrote:
>>> >
>>> >     Right now, sharding can be specified only via target `shardCount`,
>>> >     be it user or runner. Next to configurable shardCount, I am
>>> >     proposing to be able to pass also a function which will allow to
>>> the
>>> >     user (or runner) control how is shard determined and what key will
>>> >     be used to represent it
>>> >
>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
>>> >     Serializable {
>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>> >     shardCount: Integer);
>>> >     }
>>> >
>>> >     Default implementation can be what is right now =>  random shard
>>> >     encapsulated as ShardedKey<Integer>.
>>> >
>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>>> >     <ma...@google.com>> wrote:
>>> >
>>> >         If sharding is not specified, then the semantics are
>>> >         "runner-determined sharding." The DataflowRunner already takes
>>> >         advantage of this to impose its own sharding if the user hasn't
>>> >         specified an explicit one. Could the Flink runner do the same
>>> >         instead of pushing this to the users?
>>> >
>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>>> >
>>> >             Hi Jozef,
>>> >
>>> >             For sharding in FileIO there are basically two options:
>>> >
>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>> >             across workers
>>> >             (2) num_shards >> num_workers => good spread of the load
>>> >             across workers,
>>> >             but huge number of files
>>> >
>>> >             Your approach would give users control over the sharding
>>> >             keys such that
>>> >             they could be adjusted to spread load more evenly.
>>> >
>>> >             I'd like to hear from Beam IO experts if that would make
>>> sense.
>>> >
>>> >             Thanks,
>>> >             Max
>>> >
>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>> >              > Hello,
>>> >              >
>>> >              > Right now, if someone needs sharded files via FileIO,
>>> >             there is only one
>>> >              > option which is random (round robin) shard assignment
>>> per
>>> >             element and it
>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>> which
>>> >             follows.
>>> >              >
>>> >              > I would like to generalize this and have a possibility
>>> to
>>> >             provide some
>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>> >              > What I am mainly after is, to have a possibility to
>>> >             provide optimisation
>>> >              > for Flink runtime and pass in a special function which
>>> >             generates shard
>>> >              > keys in a way that they are evenly spread among workers
>>> >             (BEAM-5865).
>>> >              >
>>> >              > Would such extension for FileIO make sense? If yes, I
>>> >             would create a
>>> >              > ticket for it and try to draft a PR.
>>> >              >
>>> >              > Best,
>>> >              > Jozef
>>> >
>>>
>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
I guess it is fine to enable shardingFn control only on WriteFiles level
rather than FileIO. On WriteFiles it can be manipulated in
PTransformOverride by runner.

On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:

> Yes, a hook would have to be added to allow specifying a different
> function for choosing the shard number (I assume the problem is that there
> are cases where the current random assignment is not good?). However this
> can be set using PTransformOverride, we ideally shouldn't force the user to
> know details of the runner when writing their code.
>
> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Reuven is talking about PTransformOverride, e.g.
>> FlinkTransformOverrides. We already use this to determine the number of
>> shards in case of Runner-determined sharding.
>>
>> Not sure if that would work for Jozef's case because setting the number
>> of shards is not enough. We want to set the shard key directly and that
>> logic is buried inside WriteFiles.
>>
>> -Max
>>
>> On 25.04.19 16:30, Reuven Lax wrote:
>> > Actually the runner is free to perform surgery on the graph. The
>> > FlinkRunner can insert a custom function to determine the sharding keys.
>> >
>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     Right now, sharding can be specified only via target `shardCount`,
>> >     be it user or runner. Next to configurable shardCount, I am
>> >     proposing to be able to pass also a function which will allow to the
>> >     user (or runner) control how is shard determined and what key will
>> >     be used to represent it
>> >
>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
>> >     Serializable {
>> >         ShardKeyT assign(DestinationT destination, UserT element,
>> >     shardCount: Integer);
>> >     }
>> >
>> >     Default implementation can be what is right now =>  random shard
>> >     encapsulated as ShardedKey<Integer>.
>> >
>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>> >     <ma...@google.com>> wrote:
>> >
>> >         If sharding is not specified, then the semantics are
>> >         "runner-determined sharding." The DataflowRunner already takes
>> >         advantage of this to impose its own sharding if the user hasn't
>> >         specified an explicit one. Could the Flink runner do the same
>> >         instead of pushing this to the users?
>> >
>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>> >         <mxm@apache.org <ma...@apache.org>> wrote:
>> >
>> >             Hi Jozef,
>> >
>> >             For sharding in FileIO there are basically two options:
>> >
>> >             (1) num_shards ~= num_workers => bad spread of the load
>> >             across workers
>> >             (2) num_shards >> num_workers => good spread of the load
>> >             across workers,
>> >             but huge number of files
>> >
>> >             Your approach would give users control over the sharding
>> >             keys such that
>> >             they could be adjusted to spread load more evenly.
>> >
>> >             I'd like to hear from Beam IO experts if that would make
>> sense.
>> >
>> >             Thanks,
>> >             Max
>> >
>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>> >              > Hello,
>> >              >
>> >              > Right now, if someone needs sharded files via FileIO,
>> >             there is only one
>> >              > option which is random (round robin) shard assignment per
>> >             element and it
>> >              > always use ShardedKey<Integer> as a key for the GBK which
>> >             follows.
>> >              >
>> >              > I would like to generalize this and have a possibility to
>> >             provide some
>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>> >              > What I am mainly after is, to have a possibility to
>> >             provide optimisation
>> >              > for Flink runtime and pass in a special function which
>> >             generates shard
>> >              > keys in a way that they are evenly spread among workers
>> >             (BEAM-5865).
>> >              >
>> >              > Would such extension for FileIO make sense? If yes, I
>> >             would create a
>> >              > ticket for it and try to draft a PR.
>> >              >
>> >              > Best,
>> >              > Jozef
>> >
>>
>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
Yes, a hook would have to be added to allow specifying a different function
for choosing the shard number (I assume the problem is that there are cases
where the current random assignment is not good?). However this can be set
using PTransformOverride, we ideally shouldn't force the user to know
details of the runner when writing their code.

On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <mx...@apache.org> wrote:

> Reuven is talking about PTransformOverride, e.g.
> FlinkTransformOverrides. We already use this to determine the number of
> shards in case of Runner-determined sharding.
>
> Not sure if that would work for Jozef's case because setting the number
> of shards is not enough. We want to set the shard key directly and that
> logic is buried inside WriteFiles.
>
> -Max
>
> On 25.04.19 16:30, Reuven Lax wrote:
> > Actually the runner is free to perform surgery on the graph. The
> > FlinkRunner can insert a custom function to determine the sharding keys.
> >
> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Right now, sharding can be specified only via target `shardCount`,
> >     be it user or runner. Next to configurable shardCount, I am
> >     proposing to be able to pass also a function which will allow to the
> >     user (or runner) control how is shard determined and what key will
> >     be used to represent it
> >
> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
> >     Serializable {
> >         ShardKeyT assign(DestinationT destination, UserT element,
> >     shardCount: Integer);
> >     }
> >
> >     Default implementation can be what is right now =>  random shard
> >     encapsulated as ShardedKey<Integer>.
> >
> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
> >     <ma...@google.com>> wrote:
> >
> >         If sharding is not specified, then the semantics are
> >         "runner-determined sharding." The DataflowRunner already takes
> >         advantage of this to impose its own sharding if the user hasn't
> >         specified an explicit one. Could the Flink runner do the same
> >         instead of pushing this to the users?
> >
> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>> wrote:
> >
> >             Hi Jozef,
> >
> >             For sharding in FileIO there are basically two options:
> >
> >             (1) num_shards ~= num_workers => bad spread of the load
> >             across workers
> >             (2) num_shards >> num_workers => good spread of the load
> >             across workers,
> >             but huge number of files
> >
> >             Your approach would give users control over the sharding
> >             keys such that
> >             they could be adjusted to spread load more evenly.
> >
> >             I'd like to hear from Beam IO experts if that would make
> sense.
> >
> >             Thanks,
> >             Max
> >
> >             On 25.04.19 08:52, Jozef Vilcek wrote:
> >              > Hello,
> >              >
> >              > Right now, if someone needs sharded files via FileIO,
> >             there is only one
> >              > option which is random (round robin) shard assignment per
> >             element and it
> >              > always use ShardedKey<Integer> as a key for the GBK which
> >             follows.
> >              >
> >              > I would like to generalize this and have a possibility to
> >             provide some
> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
> >              > What I am mainly after is, to have a possibility to
> >             provide optimisation
> >              > for Flink runtime and pass in a special function which
> >             generates shard
> >              > keys in a way that they are evenly spread among workers
> >             (BEAM-5865).
> >              >
> >              > Would such extension for FileIO make sense? If yes, I
> >             would create a
> >              > ticket for it and try to draft a PR.
> >              >
> >              > Best,
> >              > Jozef
> >
>

Re: Custom shardingFn for FileIO

Posted by Maximilian Michels <mx...@apache.org>.
Reuven is talking about PTransformOverride, e.g. 
FlinkTransformOverrides. We already use this to determine the number of 
shards in case of Runner-determined sharding.

Not sure if that would work for Jozef's case because setting the number 
of shards is not enough. We want to set the shard key directly and that 
logic is buried inside WriteFiles.

-Max

On 25.04.19 16:30, Reuven Lax wrote:
> Actually the runner is free to perform surgery on the graph. The 
> FlinkRunner can insert a custom function to determine the sharding keys.
> 
> On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vilcek@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Right now, sharding can be specified only via target `shardCount`,
>     be it user or runner. Next to configurable shardCount, I am
>     proposing to be able to pass also a function which will allow to the
>     user (or runner) control how is shard determined and what key will
>     be used to represent it
> 
>     interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
>     Serializable {
>         ShardKeyT assign(DestinationT destination, UserT element,
>     shardCount: Integer);
>     }
> 
>     Default implementation can be what is right now =>  random shard
>     encapsulated as ShardedKey<Integer>.
> 
>     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <relax@google.com
>     <ma...@google.com>> wrote:
> 
>         If sharding is not specified, then the semantics are
>         "runner-determined sharding." The DataflowRunner already takes
>         advantage of this to impose its own sharding if the user hasn't
>         specified an explicit one. Could the Flink runner do the same
>         instead of pushing this to the users?
> 
>         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>> wrote:
> 
>             Hi Jozef,
> 
>             For sharding in FileIO there are basically two options:
> 
>             (1) num_shards ~= num_workers => bad spread of the load
>             across workers
>             (2) num_shards >> num_workers => good spread of the load
>             across workers,
>             but huge number of files
> 
>             Your approach would give users control over the sharding
>             keys such that
>             they could be adjusted to spread load more evenly.
> 
>             I'd like to hear from Beam IO experts if that would make sense.
> 
>             Thanks,
>             Max
> 
>             On 25.04.19 08:52, Jozef Vilcek wrote:
>              > Hello,
>              >
>              > Right now, if someone needs sharded files via FileIO,
>             there is only one
>              > option which is random (round robin) shard assignment per
>             element and it
>              > always use ShardedKey<Integer> as a key for the GBK which
>             follows.
>              >
>              > I would like to generalize this and have a possibility to
>             provide some
>              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>              > What I am mainly after is, to have a possibility to
>             provide optimisation
>              > for Flink runtime and pass in a special function which
>             generates shard
>              > keys in a way that they are evenly spread among workers
>             (BEAM-5865).
>              >
>              > Would such extension for FileIO make sense? If yes, I
>             would create a
>              > ticket for it and try to draft a PR.
>              >
>              > Best,
>              > Jozef
> 

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
Actually the runner is free to perform surgery on the graph. The
FlinkRunner can insert a custom function to determine the sharding keys.

On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Right now, sharding can be specified only via target `shardCount`, be it
> user or runner. Next to configurable shardCount, I am proposing to be able
> to pass also a function which will allow to the user (or runner) control
> how is shard determined and what key will be used to represent it
>
> interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
> Serializable {
>    ShardKeyT assign(DestinationT destination, UserT element, shardCount:
> Integer);
> }
>
> Default implementation can be what is right now =>  random shard
> encapsulated as ShardedKey<Integer>.
>
>
> On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com> wrote:
>
>> If sharding is not specified, then the semantics are "runner-determined
>> sharding." The DataflowRunner already takes advantage of this to impose its
>> own sharding if the user hasn't specified an explicit one. Could the Flink
>> runner do the same instead of pushing this to the users?
>>
>> On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hi Jozef,
>>>
>>> For sharding in FileIO there are basically two options:
>>>
>>> (1) num_shards ~= num_workers => bad spread of the load across workers
>>> (2) num_shards >> num_workers => good spread of the load across workers,
>>> but huge number of files
>>>
>>> Your approach would give users control over the sharding keys such that
>>> they could be adjusted to spread load more evenly.
>>>
>>> I'd like to hear from Beam IO experts if that would make sense.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 25.04.19 08:52, Jozef Vilcek wrote:
>>> > Hello,
>>> >
>>> > Right now, if someone needs sharded files via FileIO, there is only
>>> one
>>> > option which is random (round robin) shard assignment per element and
>>> it
>>> > always use ShardedKey<Integer> as a key for the GBK which follows.
>>> >
>>> > I would like to generalize this and have a possibility to provide some
>>> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>> > What I am mainly after is, to have a possibility to provide
>>> optimisation
>>> > for Flink runtime and pass in a special function which generates shard
>>> > keys in a way that they are evenly spread among workers (BEAM-5865).
>>> >
>>> > Would such extension for FileIO make sense? If yes, I would create a
>>> > ticket for it and try to draft a PR.
>>> >
>>> > Best,
>>> > Jozef
>>>
>>

Re: Custom shardingFn for FileIO

Posted by Jozef Vilcek <jo...@gmail.com>.
Right now, sharding can be specified only via target `shardCount`, be it
user or runner. Next to configurable shardCount, I am proposing to be able
to pass also a function which will allow to the user (or runner) control
how is shard determined and what key will be used to represent it

interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
Serializable {
   ShardKeyT assign(DestinationT destination, UserT element, shardCount:
Integer);
}

Default implementation can be what is right now =>  random shard
encapsulated as ShardedKey<Integer>.


On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com> wrote:

> If sharding is not specified, then the semantics are "runner-determined
> sharding." The DataflowRunner already takes advantage of this to impose its
> own sharding if the user hasn't specified an explicit one. Could the Flink
> runner do the same instead of pushing this to the users?
>
> On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Hi Jozef,
>>
>> For sharding in FileIO there are basically two options:
>>
>> (1) num_shards ~= num_workers => bad spread of the load across workers
>> (2) num_shards >> num_workers => good spread of the load across workers,
>> but huge number of files
>>
>> Your approach would give users control over the sharding keys such that
>> they could be adjusted to spread load more evenly.
>>
>> I'd like to hear from Beam IO experts if that would make sense.
>>
>> Thanks,
>> Max
>>
>> On 25.04.19 08:52, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > Right now, if someone needs sharded files via FileIO, there is only one
>> > option which is random (round robin) shard assignment per element and
>> it
>> > always use ShardedKey<Integer> as a key for the GBK which follows.
>> >
>> > I would like to generalize this and have a possibility to provide some
>> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>> > What I am mainly after is, to have a possibility to provide
>> optimisation
>> > for Flink runtime and pass in a special function which generates shard
>> > keys in a way that they are evenly spread among workers (BEAM-5865).
>> >
>> > Would such extension for FileIO make sense? If yes, I would create a
>> > ticket for it and try to draft a PR.
>> >
>> > Best,
>> > Jozef
>>
>

Re: Custom shardingFn for FileIO

Posted by Reuven Lax <re...@google.com>.
If sharding is not specified, then the semantics are "runner-determined
sharding." The DataflowRunner already takes advantage of this to impose its
own sharding if the user hasn't specified an explicit one. Could the Flink
runner do the same instead of pushing this to the users?

On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Jozef,
>
> For sharding in FileIO there are basically two options:
>
> (1) num_shards ~= num_workers => bad spread of the load across workers
> (2) num_shards >> num_workers => good spread of the load across workers,
> but huge number of files
>
> Your approach would give users control over the sharding keys such that
> they could be adjusted to spread load more evenly.
>
> I'd like to hear from Beam IO experts if that would make sense.
>
> Thanks,
> Max
>
> On 25.04.19 08:52, Jozef Vilcek wrote:
> > Hello,
> >
> > Right now, if someone needs sharded files via FileIO, there is only one
> > option which is random (round robin) shard assignment per element and it
> > always use ShardedKey<Integer> as a key for the GBK which follows.
> >
> > I would like to generalize this and have a possibility to provide some
> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
> > What I am mainly after is, to have a possibility to provide optimisation
> > for Flink runtime and pass in a special function which generates shard
> > keys in a way that they are evenly spread among workers (BEAM-5865).
> >
> > Would such extension for FileIO make sense? If yes, I would create a
> > ticket for it and try to draft a PR.
> >
> > Best,
> > Jozef
>

Re: Custom shardingFn for FileIO

Posted by Maximilian Michels <mx...@apache.org>.
Hi Jozef,

For sharding in FileIO there are basically two options:

(1) num_shards ~= num_workers => bad spread of the load across workers
(2) num_shards >> num_workers => good spread of the load across workers, 
but huge number of files

Your approach would give users control over the sharding keys such that 
they could be adjusted to spread load more evenly.

I'd like to hear from Beam IO experts if that would make sense.

Thanks,
Max

On 25.04.19 08:52, Jozef Vilcek wrote:
> Hello,
> 
> Right now, if someone needs sharded files via FileIO, there is only one 
> option which is random (round robin) shard assignment per element and it 
> always use ShardedKey<Integer> as a key for the GBK which follows.
> 
> I would like to generalize this and have a possibility to provide some 
> ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
> What I am mainly after is, to have a possibility to provide optimisation 
> for Flink runtime and pass in a special function which generates shard 
> keys in a way that they are evenly spread among workers (BEAM-5865).
> 
> Would such extension for FileIO make sense? If yes, I would create a 
> ticket for it and try to draft a PR.
> 
> Best,
> Jozef