You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Akshay Iyangar <ai...@godaddy.com> on 2020/06/26 20:13:48 UTC

Individual Parallelism support for Flink Runner

Hi beam community,

So I had brought this issue in our slack channel but I guess this warrants a deeper discussion and if we do go about what is the POA for it.

So basically currently for Flink Runner we don’t support operator level parallelism which native Flink provides OOTB. So I was wondering what the community feels about having some way to pass parallelism for individual operators esp.  for some of the existing IO’s

Wanted to know what people think of this.

Thanks
Akshay I

Re: Individual Parallelism support for Flink Runner

Posted by Akshay Iyangar <ai...@godaddy.com>.
Hi

As a use case we have records being fetched from Kinesis as well as S3 (Bounded) source as an unified pipeline which eventually is flattened into a single projection/output  for processing the data. But we usually end up not needing a lot of task slots / parallelism for processing data coming in from Kinesis as opposed to we need more parallelism for reading data from S3. So usually in this case we end up assigning a parallelism of X which is way more than what is actually needed for our unbounded stream in our case Kinesis. We then usually are forced to either have equal shards to distribute the load among task slots but shards = money w.r.t AWS. Or we can stick with fixed shard in which case we don’t use the compute to the fullest potential and also usually this causes errors like rate limit exceeded within kinesis itself.

So I was wondering if we can at least have a way to mention individual parallelism for some sources and sinks. I understand that individual parallelism for operators won’t be easy especially if its targeted to a specific runner .. but can the IO’s have a pipelineOption may be that is extended out of FlinkRunner’s pipelineOptions which when set can be used to set it’s parallelism or can default to the global one when the runner is flink?


Thanks
Akshay I.







From: amit kumar <ak...@gmail.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Monday, June 29, 2020 at 2:47 PM
To: "dev@beam.apache.org" <de...@beam.apache.org>
Subject: Re: Individual Parallelism support for Flink Runner

Notice: This email is from an external sender.


Looks like

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#operator-level

Regards,
Amit

On Mon, Jun 29, 2020 at 12:59 PM Kenneth Knowles <ke...@apache.org>> wrote:
This exact issue has been discussed before, though I can't find the older threads. Basically, specifying parallelism is a workaround (aka a cost), not a feature (aka a benefit). Sometimes you have to pay that cost as it is the only solution currently understood or implemented. It depends on what your reason is for having to set parallelism.

A lot of the time, the parallelism is a property of the combination of the pipeline and the data. The same pipeline with different data should have this tuned differently. For composite transforms in a library (not the top level pipeline) this is even more likely. It sounds like the suggestions here fit this case.

Some of the time, max parallelism has to do with not overwhelming another service. This depends on the particular endpoint. That is usually construction-time information. In this case you want to have portable mandatory limits.

Could you clarify your use case?

Kenn

On Mon, Jun 29, 2020 at 8:58 AM Luke Cwik <lc...@google.com>> wrote:
Check out this thread[1] about adding "runner determined sharding" as a general concept. This could be used to enhance the reshuffle implementation significantly and might remove the need for per transform parallelism from that specific use case and likely from most others.

1: https://lists.apache.org/thread.html/rfd1ca93268eb215fbbcfe098c1dfb330f1b84fb89673325135dfd9a8%40%3Cdev.beam.apache.org%3E

On Mon, Jun 29, 2020 at 4:03 AM Maximilian Michels <mx...@apache.org>> wrote:
We could allow parameterizing transforms by using transform identifiers
from the pipeline, e.g.


   options = ['--parameterize=MyTransform;parallelism=5']
   with Pipeline.create(PipelineOptions(options)) as p:
     p | Create(1, 2, 3) | 'MyTransform' >> ParDo(..)


Those hints should always be optional, such that a pipeline continues to
run on all runners.

-Max

On 28.06.20 14:30, Reuven Lax wrote:
> However such a parameter would be specific to a single transform,
> whereas maxNumWorkers is a global parameter today.
>
> On Sat, Jun 27, 2020 at 10:31 PM Daniel Collins <dp...@google.com>
> <ma...@google.com>>> wrote:
>
>     I could imagine for example, a 'parallelismHint' field in the base
>     parameters that could be set to maxNumWorkers when running on
>     dataflow or an equivalent parameter when running on flink. It would
>     be useful to get a default value for the sharding in the Reshuffle
>     changes here https://github.com/apache/beam/pull/11919, but more
>     generally to have some decent guess on how to best shard work. Then
>     it would be runner-agnostic; you could set it to something like
>     numCpus on the local runner for instance.
>
>     On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax <re...@google.com>
>     <ma...@google.com>>> wrote:
>
>         It's an interesting question - this parameter is clearly very
>         runner specific (e.g. it would be meaningless for the Dataflow
>         runner, where parallelism is not a static constant). How should
>         we go about passing runner-specific options per transform?
>
>         On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar
>         <ai...@godaddy.com> <ma...@godaddy.com>>> wrote:
>
>             Hi beam community,____
>
>             __ __
>
>             So I had brought this issue in our slack channel but I guess
>             this warrants a deeper discussion and if we do go about what
>             is the POA for it.____
>
>             __ __
>
>             So basically currently for Flink Runner we don’t support
>             operator level parallelism which native Flink provides OOTB.
>             So I was wondering what the community feels about having
>             some way to pass parallelism for individual operators esp.
>               for some of the existing IO’s ____
>
>             __ __
>
>             Wanted to know what people think of this.____
>
>             __ __
>
>             Thanks ____
>
>             Akshay I____
>

Re: Individual Parallelism support for Flink Runner

Posted by amit kumar <ak...@gmail.com>.
Looks like

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#operator-level

Regards,
Amit

On Mon, Jun 29, 2020 at 12:59 PM Kenneth Knowles <ke...@apache.org> wrote:

> This exact issue has been discussed before, though I can't find the older
> threads. Basically, specifying parallelism is a workaround (aka a cost),
> not a feature (aka a benefit). Sometimes you have to pay that cost as it is
> the only solution currently understood or implemented. It depends on what
> your reason is for having to set parallelism.
>
> A lot of the time, the parallelism is a property of the combination of the
> pipeline and the data. The same pipeline with different data should have
> this tuned differently. For composite transforms in a library (not the top
> level pipeline) this is even more likely. It sounds like the suggestions
> here fit this case.
>
> Some of the time, max parallelism has to do with not overwhelming another
> service. This depends on the particular endpoint. That is usually
> construction-time information. In this case you want to have portable
> mandatory limits.
>
> Could you clarify your use case?
>
> Kenn
>
> On Mon, Jun 29, 2020 at 8:58 AM Luke Cwik <lc...@google.com> wrote:
>
>> Check out this thread[1] about adding "runner determined sharding" as a
>> general concept. This could be used to enhance the reshuffle implementation
>> significantly and might remove the need for per transform parallelism from
>> that specific use case and likely from most others.
>>
>> 1:
>> https://lists.apache.org/thread.html/rfd1ca93268eb215fbbcfe098c1dfb330f1b84fb89673325135dfd9a8%40%3Cdev.beam.apache.org%3E
>>
>> On Mon, Jun 29, 2020 at 4:03 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> We could allow parameterizing transforms by using transform identifiers
>>> from the pipeline, e.g.
>>>
>>>
>>>    options = ['--parameterize=MyTransform;parallelism=5']
>>>    with Pipeline.create(PipelineOptions(options)) as p:
>>>      p | Create(1, 2, 3) | 'MyTransform' >> ParDo(..)
>>>
>>>
>>> Those hints should always be optional, such that a pipeline continues to
>>> run on all runners.
>>>
>>> -Max
>>>
>>> On 28.06.20 14:30, Reuven Lax wrote:
>>> > However such a parameter would be specific to a single transform,
>>> > whereas maxNumWorkers is a global parameter today.
>>> >
>>> > On Sat, Jun 27, 2020 at 10:31 PM Daniel Collins <dpcollins@google.com
>>> > <ma...@google.com>> wrote:
>>> >
>>> >     I could imagine for example, a 'parallelismHint' field in the base
>>> >     parameters that could be set to maxNumWorkers when running on
>>> >     dataflow or an equivalent parameter when running on flink. It would
>>> >     be useful to get a default value for the sharding in the Reshuffle
>>> >     changes here https://github.com/apache/beam/pull/11919, but more
>>> >     generally to have some decent guess on how to best shard work. Then
>>> >     it would be runner-agnostic; you could set it to something like
>>> >     numCpus on the local runner for instance.
>>> >
>>> >     On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax <relax@google.com
>>> >     <ma...@google.com>> wrote:
>>> >
>>> >         It's an interesting question - this parameter is clearly very
>>> >         runner specific (e.g. it would be meaningless for the Dataflow
>>> >         runner, where parallelism is not a static constant). How should
>>> >         we go about passing runner-specific options per transform?
>>> >
>>> >         On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar
>>> >         <aiyangar@godaddy.com <ma...@godaddy.com>> wrote:
>>> >
>>> >             Hi beam community,____
>>> >
>>> >             __ __
>>> >
>>> >             So I had brought this issue in our slack channel but I
>>> guess
>>> >             this warrants a deeper discussion and if we do go about
>>> what
>>> >             is the POA for it.____
>>> >
>>> >             __ __
>>> >
>>> >             So basically currently for Flink Runner we don’t support
>>> >             operator level parallelism which native Flink provides
>>> OOTB.
>>> >             So I was wondering what the community feels about having
>>> >             some way to pass parallelism for individual operators esp.
>>> >               for some of the existing IO’s ____
>>> >
>>> >             __ __
>>> >
>>> >             Wanted to know what people think of this.____
>>> >
>>> >             __ __
>>> >
>>> >             Thanks ____
>>> >
>>> >             Akshay I____
>>> >
>>>
>>

Re: Individual Parallelism support for Flink Runner

Posted by Kenneth Knowles <ke...@apache.org>.
This exact issue has been discussed before, though I can't find the older
threads. Basically, specifying parallelism is a workaround (aka a cost),
not a feature (aka a benefit). Sometimes you have to pay that cost as it is
the only solution currently understood or implemented. It depends on what
your reason is for having to set parallelism.

A lot of the time, the parallelism is a property of the combination of the
pipeline and the data. The same pipeline with different data should have
this tuned differently. For composite transforms in a library (not the top
level pipeline) this is even more likely. It sounds like the suggestions
here fit this case.

Some of the time, max parallelism has to do with not overwhelming another
service. This depends on the particular endpoint. That is usually
construction-time information. In this case you want to have portable
mandatory limits.

Could you clarify your use case?

Kenn

On Mon, Jun 29, 2020 at 8:58 AM Luke Cwik <lc...@google.com> wrote:

> Check out this thread[1] about adding "runner determined sharding" as a
> general concept. This could be used to enhance the reshuffle implementation
> significantly and might remove the need for per transform parallelism from
> that specific use case and likely from most others.
>
> 1:
> https://lists.apache.org/thread.html/rfd1ca93268eb215fbbcfe098c1dfb330f1b84fb89673325135dfd9a8%40%3Cdev.beam.apache.org%3E
>
> On Mon, Jun 29, 2020 at 4:03 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> We could allow parameterizing transforms by using transform identifiers
>> from the pipeline, e.g.
>>
>>
>>    options = ['--parameterize=MyTransform;parallelism=5']
>>    with Pipeline.create(PipelineOptions(options)) as p:
>>      p | Create(1, 2, 3) | 'MyTransform' >> ParDo(..)
>>
>>
>> Those hints should always be optional, such that a pipeline continues to
>> run on all runners.
>>
>> -Max
>>
>> On 28.06.20 14:30, Reuven Lax wrote:
>> > However such a parameter would be specific to a single transform,
>> > whereas maxNumWorkers is a global parameter today.
>> >
>> > On Sat, Jun 27, 2020 at 10:31 PM Daniel Collins <dpcollins@google.com
>> > <ma...@google.com>> wrote:
>> >
>> >     I could imagine for example, a 'parallelismHint' field in the base
>> >     parameters that could be set to maxNumWorkers when running on
>> >     dataflow or an equivalent parameter when running on flink. It would
>> >     be useful to get a default value for the sharding in the Reshuffle
>> >     changes here https://github.com/apache/beam/pull/11919, but more
>> >     generally to have some decent guess on how to best shard work. Then
>> >     it would be runner-agnostic; you could set it to something like
>> >     numCpus on the local runner for instance.
>> >
>> >     On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax <relax@google.com
>> >     <ma...@google.com>> wrote:
>> >
>> >         It's an interesting question - this parameter is clearly very
>> >         runner specific (e.g. it would be meaningless for the Dataflow
>> >         runner, where parallelism is not a static constant). How should
>> >         we go about passing runner-specific options per transform?
>> >
>> >         On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar
>> >         <aiyangar@godaddy.com <ma...@godaddy.com>> wrote:
>> >
>> >             Hi beam community,____
>> >
>> >             __ __
>> >
>> >             So I had brought this issue in our slack channel but I guess
>> >             this warrants a deeper discussion and if we do go about what
>> >             is the POA for it.____
>> >
>> >             __ __
>> >
>> >             So basically currently for Flink Runner we don’t support
>> >             operator level parallelism which native Flink provides OOTB.
>> >             So I was wondering what the community feels about having
>> >             some way to pass parallelism for individual operators esp.
>> >               for some of the existing IO’s ____
>> >
>> >             __ __
>> >
>> >             Wanted to know what people think of this.____
>> >
>> >             __ __
>> >
>> >             Thanks ____
>> >
>> >             Akshay I____
>> >
>>
>

Re: Individual Parallelism support for Flink Runner

Posted by Luke Cwik <lc...@google.com>.
Check out this thread[1] about adding "runner determined sharding" as a
general concept. This could be used to enhance the reshuffle implementation
significantly and might remove the need for per transform parallelism from
that specific use case and likely from most others.

1:
https://lists.apache.org/thread.html/rfd1ca93268eb215fbbcfe098c1dfb330f1b84fb89673325135dfd9a8%40%3Cdev.beam.apache.org%3E

On Mon, Jun 29, 2020 at 4:03 AM Maximilian Michels <mx...@apache.org> wrote:

> We could allow parameterizing transforms by using transform identifiers
> from the pipeline, e.g.
>
>
>    options = ['--parameterize=MyTransform;parallelism=5']
>    with Pipeline.create(PipelineOptions(options)) as p:
>      p | Create(1, 2, 3) | 'MyTransform' >> ParDo(..)
>
>
> Those hints should always be optional, such that a pipeline continues to
> run on all runners.
>
> -Max
>
> On 28.06.20 14:30, Reuven Lax wrote:
> > However such a parameter would be specific to a single transform,
> > whereas maxNumWorkers is a global parameter today.
> >
> > On Sat, Jun 27, 2020 at 10:31 PM Daniel Collins <dpcollins@google.com
> > <ma...@google.com>> wrote:
> >
> >     I could imagine for example, a 'parallelismHint' field in the base
> >     parameters that could be set to maxNumWorkers when running on
> >     dataflow or an equivalent parameter when running on flink. It would
> >     be useful to get a default value for the sharding in the Reshuffle
> >     changes here https://github.com/apache/beam/pull/11919, but more
> >     generally to have some decent guess on how to best shard work. Then
> >     it would be runner-agnostic; you could set it to something like
> >     numCpus on the local runner for instance.
> >
> >     On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax <relax@google.com
> >     <ma...@google.com>> wrote:
> >
> >         It's an interesting question - this parameter is clearly very
> >         runner specific (e.g. it would be meaningless for the Dataflow
> >         runner, where parallelism is not a static constant). How should
> >         we go about passing runner-specific options per transform?
> >
> >         On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar
> >         <aiyangar@godaddy.com <ma...@godaddy.com>> wrote:
> >
> >             Hi beam community,____
> >
> >             __ __
> >
> >             So I had brought this issue in our slack channel but I guess
> >             this warrants a deeper discussion and if we do go about what
> >             is the POA for it.____
> >
> >             __ __
> >
> >             So basically currently for Flink Runner we don’t support
> >             operator level parallelism which native Flink provides OOTB.
> >             So I was wondering what the community feels about having
> >             some way to pass parallelism for individual operators esp.
> >               for some of the existing IO’s ____
> >
> >             __ __
> >
> >             Wanted to know what people think of this.____
> >
> >             __ __
> >
> >             Thanks ____
> >
> >             Akshay I____
> >
>

Re: Individual Parallelism support for Flink Runner

Posted by Maximilian Michels <mx...@apache.org>.
We could allow parameterizing transforms by using transform identifiers 
from the pipeline, e.g.


   options = ['--parameterize=MyTransform;parallelism=5']
   with Pipeline.create(PipelineOptions(options)) as p:
     p | Create(1, 2, 3) | 'MyTransform' >> ParDo(..)


Those hints should always be optional, such that a pipeline continues to 
run on all runners.

-Max

On 28.06.20 14:30, Reuven Lax wrote:
> However such a parameter would be specific to a single transform, 
> whereas maxNumWorkers is a global parameter today.
> 
> On Sat, Jun 27, 2020 at 10:31 PM Daniel Collins <dpcollins@google.com 
> <ma...@google.com>> wrote:
> 
>     I could imagine for example, a 'parallelismHint' field in the base
>     parameters that could be set to maxNumWorkers when running on
>     dataflow or an equivalent parameter when running on flink. It would
>     be useful to get a default value for the sharding in the Reshuffle
>     changes here https://github.com/apache/beam/pull/11919, but more
>     generally to have some decent guess on how to best shard work. Then
>     it would be runner-agnostic; you could set it to something like
>     numCpus on the local runner for instance.
> 
>     On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax <relax@google.com
>     <ma...@google.com>> wrote:
> 
>         It's an interesting question - this parameter is clearly very
>         runner specific (e.g. it would be meaningless for the Dataflow
>         runner, where parallelism is not a static constant). How should
>         we go about passing runner-specific options per transform?
> 
>         On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar
>         <aiyangar@godaddy.com <ma...@godaddy.com>> wrote:
> 
>             Hi beam community,____
> 
>             __ __
> 
>             So I had brought this issue in our slack channel but I guess
>             this warrants a deeper discussion and if we do go about what
>             is the POA for it.____
> 
>             __ __
> 
>             So basically currently for Flink Runner we don’t support
>             operator level parallelism which native Flink provides OOTB.
>             So I was wondering what the community feels about having
>             some way to pass parallelism for individual operators esp.
>               for some of the existing IO’s ____
> 
>             __ __
> 
>             Wanted to know what people think of this.____
> 
>             __ __
> 
>             Thanks ____
> 
>             Akshay I____
> 

Re: Individual Parallelism support for Flink Runner

Posted by Reuven Lax <re...@google.com>.
However such a parameter would be specific to a single transform,
whereas maxNumWorkers is a global parameter today.

On Sat, Jun 27, 2020 at 10:31 PM Daniel Collins <dp...@google.com>
wrote:

> I could imagine for example, a 'parallelismHint' field in the base
> parameters that could be set to maxNumWorkers when running on dataflow or
> an equivalent parameter when running on flink. It would be useful to get a
> default value for the sharding in the Reshuffle changes here
> https://github.com/apache/beam/pull/11919, but more generally to have
> some decent guess on how to best shard work. Then it would be
> runner-agnostic; you could set it to something like numCpus on the local
> runner for instance.
>
> On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax <re...@google.com> wrote:
>
>> It's an interesting question - this parameter is clearly very runner
>> specific (e.g. it would be meaningless for the Dataflow runner, where
>> parallelism is not a static constant). How should we go about passing
>> runner-specific options per transform?
>>
>> On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar <ai...@godaddy.com>
>> wrote:
>>
>>> Hi beam community,
>>>
>>>
>>>
>>> So I had brought this issue in our slack channel but I guess this
>>> warrants a deeper discussion and if we do go about what is the POA for it.
>>>
>>>
>>>
>>> So basically currently for Flink Runner we don’t support operator level
>>> parallelism which native Flink provides OOTB. So I was wondering what the
>>> community feels about having some way to pass parallelism for individual
>>> operators esp.  for some of the existing IO’s
>>>
>>>
>>>
>>> Wanted to know what people think of this.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Akshay I
>>>
>>

Re: Individual Parallelism support for Flink Runner

Posted by Daniel Collins <dp...@google.com>.
I could imagine for example, a 'parallelismHint' field in the base
parameters that could be set to maxNumWorkers when running on dataflow or
an equivalent parameter when running on flink. It would be useful to get a
default value for the sharding in the Reshuffle changes here
https://github.com/apache/beam/pull/11919, but more generally to have some
decent guess on how to best shard work. Then it would be runner-agnostic;
you could set it to something like numCpus on the local runner for instance.

On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax <re...@google.com> wrote:

> It's an interesting question - this parameter is clearly very runner
> specific (e.g. it would be meaningless for the Dataflow runner, where
> parallelism is not a static constant). How should we go about passing
> runner-specific options per transform?
>
> On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar <ai...@godaddy.com>
> wrote:
>
>> Hi beam community,
>>
>>
>>
>> So I had brought this issue in our slack channel but I guess this
>> warrants a deeper discussion and if we do go about what is the POA for it.
>>
>>
>>
>> So basically currently for Flink Runner we don’t support operator level
>> parallelism which native Flink provides OOTB. So I was wondering what the
>> community feels about having some way to pass parallelism for individual
>> operators esp.  for some of the existing IO’s
>>
>>
>>
>> Wanted to know what people think of this.
>>
>>
>>
>> Thanks
>>
>> Akshay I
>>
>

Re: Individual Parallelism support for Flink Runner

Posted by Reuven Lax <re...@google.com>.
It's an interesting question - this parameter is clearly very runner
specific (e.g. it would be meaningless for the Dataflow runner, where
parallelism is not a static constant). How should we go about passing
runner-specific options per transform?

On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar <ai...@godaddy.com> wrote:

> Hi beam community,
>
>
>
> So I had brought this issue in our slack channel but I guess this warrants
> a deeper discussion and if we do go about what is the POA for it.
>
>
>
> So basically currently for Flink Runner we don’t support operator level
> parallelism which native Flink provides OOTB. So I was wondering what the
> community feels about having some way to pass parallelism for individual
> operators esp.  for some of the existing IO’s
>
>
>
> Wanted to know what people think of this.
>
>
>
> Thanks
>
> Akshay I
>