You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ning Kang via dev <de...@beam.apache.org> on 2023/04/21 17:37:16 UTC

Re: Is there any way to set the parallelism of operators like group by, join?

Hi Jan,

To generalize the per-stage parallelism configuration, we should have a FR
proposing the capability to explicitly set autoscaling (in this case, fixed
size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not part of
the Beam model. They are [Flink] runner implementation details and should
be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion looks like
until the pipeline is submitted to a runner, thus making configuration of
the parallelism/worker-per-stage not straightforward.
Flink's parallelism settings can be found here
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
it's still kind of a black box since you don't really know how many tasks
are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a pipeline
scales, each runner could adapt [auto]scaling in their own way.
For example, in a Flink job, each operator/stage's task slot is prorated by
their key numbers; the maximum parallelism is throttled by task slot
utilization.
Another example, in a Dataflow job, each stage horizontally scales by CPU
utilization; vertically scales by memory/disk utilization.

+dev@beam.apache.org <de...@beam.apache.org>
Let's use this thread to discuss how to configure a pipeline for runners so
that they can scale workers appropriately without exposing runner-specific
details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Ning,
>
> I might have missed that in the discussion, but we talk about batch
> execution, am I right? In streaming, all operators (PTransforms) of a
> Pipeline are run in the same slots, thus the downsides are limited. You can
> enforce streaming mode using --streaming command-line argument. But yes,
> this might have other implications. For batch only it obviously makes sense
> to limit parallelism of a (fused) 'stage', which is not an transform-level
> concept, but rather a more complex union of transforms divided by shuffle
> barrier. Would you be willing to start a follow-up thread in @dev mailing
> list for this for deeper discussion?
>
>  Jan
> On 4/20/23 19:18, Ning Kang via user wrote:
>
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. And
> the operator that needs the highest parallelism can only use at most
> #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who
> cannot make full use of task slots assigned to them. You might see only
> 1/10 tasks running while the other 9/10 tasks idle for an operator with
> parallelism 10, especially when it's doing some aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator
> with high fanout, we can explicitly add a Reshuffle to allow a higher
> parallelism. But this circles back to the first downside: if your pipeline
> has exponentially high fanout through it, setting a single parallelism for
> the whole pipeline is not ideal because it limits the scalability of your
> pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> this topic was discussed many years ago and the conclusion there was that
>> setting the parallelism of individual operators via FlinkPipelineOptions
>> (or ResourceHints) is be possible, but would be somewhat cumbersome.
>> Although I understand that it "feels" weird to have high parallelism for
>> operators with small inputs, does this actually bring any relevant
>> performance impact? I always use parallelism based on the largest operator
>> in the Pipeline and this seems to work just fine. Is there any particular
>> need or measurable impact of such approach?
>>
>>  Jan
>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>
>> Same need here, using Flink runner. We are processing a pcollection
>> (extracting features per element) then combining these into groups of
>> features and running the next operator on those groups.
>>
>> Each group contains ~50 elements, so the parallelism of the operator
>> upstream of the groupby should be higher, to be balanced with the
>> downstream operator.
>>
>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> It would be better to set parallelism for operators, as I mentioned
>>> before, there may be multiple groupby, join operators in one pipeline, and
>>> their parallelism can be different due to different input data sizes.
>>>
>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Jeff - does setting the global default work for you, or do you need
>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>
>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>> operators.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>> such kind of mechanism, so I am looking for a general solution on the beam
>>>>>>> side.
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>
>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>>>>>> operators.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Kenneth Knowles <ke...@apache.org>.
FWIW I think parallelism is close enough to a resource. If you phrased it
like "how many CPUs can work independently" it is more closely related to
resources. Just like how many bits it takes to encode something is a
semantic property, but "RAM" is a resource.

I think a big role of resource hints is to be a bridge between the Beam
Model, which tries hard to only include essential information, to a
particular implementation which may not be able to autotune various
inessential/implementation details. Specifying parallelism to a runner that
still requires manual tuning of that seems like a fine use of this.

Kenn

On Fri, Apr 21, 2023 at 11:30 AM Jan Lukavský <je...@seznam.cz> wrote:

> Absolutely agree this is not something that should be part of the model.
> The ResourceHints is good place, but given how Pipeline might get fused
> (and though this might be under the control of a runner, basically all
> runners use the same code, because there is currently no reason why this
> should be runner-specifiic), there is a problem with how to resolve
> conflicting settings. Also it is somewhat questionable if parallelism is a
> "resource". It feels more like a runtime property. I tend to think that
> FlinkPipelineOptions could be a good place for that, because this seems to
> apply (mostly) to Flink batch runner.
> On 4/21/23 19:43, Robert Bradshaw via dev wrote:
>
> +1 to not requiring details like this in the Beam model. There is,
> however, the question of how to pass such implementation-detail specific
> hints to a runner that requires them. Generally that's done via
> ResourceHints or annotations, and while the former seems a good fit it's
> primarily focused on setting up the right context for user code (which GBK
> is not).
>
> A complete hack is to add an experiment like
> flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
> cleaner.
>
>
> On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <us...@beam.apache.org>
> wrote:
>
>> Hi Jan,
>>
>> To generalize the per-stage parallelism configuration, we should have a
>> FR proposing the capability to explicitly set autoscaling (in this case,
>> fixed size per stage) policy in Beam pipelines.
>>
>> Per-step or per-stage parallelism, or fusion/optimization is not part of
>> the Beam model. They are [Flink] runner implementation details and should
>> be configured for each runner.
>>
>> Also, when building the pipeline, it's not clear what the fusion looks
>> like until the pipeline is submitted to a runner, thus making configuration
>> of the parallelism/worker-per-stage not straightforward.
>> Flink's parallelism settings can be found here
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
>> it's still kind of a black box since you don't really know how many tasks
>> are actually spawned until you run a pipeline.
>>
>> That being said, if we have a general interface controlling how a
>> pipeline scales, each runner could adapt [auto]scaling in their own way.
>> For example, in a Flink job, each operator/stage's task slot is prorated
>> by their key numbers; the maximum parallelism is throttled by task slot
>> utilization.
>> Another example, in a Dataflow job, each stage horizontally scales by CPU
>> utilization; vertically scales by memory/disk utilization.
>>
>> +dev@beam.apache.org <de...@beam.apache.org>
>> Let's use this thread to discuss how to configure a pipeline for runners
>> so that they can scale workers appropriately without exposing
>> runner-specific details to the Beam model.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Ning,
>>>
>>> I might have missed that in the discussion, but we talk about batch
>>> execution, am I right? In streaming, all operators (PTransforms) of a
>>> Pipeline are run in the same slots, thus the downsides are limited. You can
>>> enforce streaming mode using --streaming command-line argument. But yes,
>>> this might have other implications. For batch only it obviously makes sense
>>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>>> concept, but rather a more complex union of transforms divided by shuffle
>>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>>> list for this for deeper discussion?
>>>
>>>  Jan
>>> On 4/20/23 19:18, Ning Kang via user wrote:
>>>
>>> Hi Jan,
>>>
>>> The approach works when your pipeline doesn't have too many operators.
>>> And the operator that needs the highest parallelism can only use at most
>>> #total_task_slots / #operators resources available in the cluster.
>>>
>>> Another downside is wasted resources for other smaller operators who
>>> cannot make full use of task slots assigned to them. You might see only
>>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>>
>>> One redeeming method is that, for operators following another operator
>>> with high fanout, we can explicitly add a Reshuffle to allow a higher
>>> parallelism. But this circles back to the first downside: if your pipeline
>>> has exponentially high fanout through it, setting a single parallelism for
>>> the whole pipeline is not ideal because it limits the scalability of your
>>> pipeline significantly.
>>>
>>> Ning.
>>>
>>>
>>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> this topic was discussed many years ago and the conclusion there was
>>>> that setting the parallelism of individual operators via
>>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be
>>>> somewhat cumbersome. Although I understand that it "feels" weird to have
>>>> high parallelism for operators with small inputs, does this actually bring
>>>> any relevant performance impact? I always use parallelism based on the
>>>> largest operator in the Pipeline and this seems to work just fine. Is there
>>>> any particular need or measurable impact of such approach?
>>>>
>>>>  Jan
>>>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>>
>>>> Same need here, using Flink runner. We are processing a pcollection
>>>> (extracting features per element) then combining these into groups of
>>>> features and running the next operator on those groups.
>>>>
>>>> Each group contains ~50 elements, so the parallelism of the operator
>>>> upstream of the groupby should be higher, to be balanced with the
>>>> downstream operator.
>>>>
>>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>>>
>>>>> Hi Reuven,
>>>>>
>>>>> It would be better to set parallelism for operators, as I mentioned
>>>>> before, there may be multiple groupby, join operators in one pipeline, and
>>>>> their parallelism can be different due to different input data sizes.
>>>>>
>>>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Jeff - does setting the global default work for you, or do you need
>>>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>>>
>>>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>>>> operators.
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>>>> such kind of mechanism, so I am looking for a general solution on the beam
>>>>>>>>> side.
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <
>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>>>
>>>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow -
>>>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as the
>>>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact these
>>>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow implements
>>>>>>>>>>>>> these operators.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <
>>>>>>>>>>>>>>>>> zjffdu@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jan Lukavský <je...@seznam.cz>.
Absolutely agree this is not something that should be part of the model. 
The ResourceHints is good place, but given how Pipeline might get fused 
(and though this might be under the control of a runner, basically all 
runners use the same code, because there is currently no reason why this 
should be runner-specifiic), there is a problem with how to resolve 
conflicting settings. Also it is somewhat questionable if parallelism is 
a "resource". It feels more like a runtime property. I tend to think 
that FlinkPipelineOptions could be a good place for that, because this 
seems to apply (mostly) to Flink batch runner.

On 4/21/23 19:43, Robert Bradshaw via dev wrote:
> +1 to not requiring details like this in the Beam model. There is, 
> however, the question of how to pass such implementation-detail 
> specific hints to a runner that requires them. Generally that's done 
> via ResourceHints or annotations, and while the former seems a good 
> fit it's primarily focused on setting up the right context for user 
> code (which GBK is not).
>
> A complete hack is to add an experiment like 
> flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do 
> something cleaner.
>
>
> On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user 
> <us...@beam.apache.org> wrote:
>
>     Hi Jan,
>
>     To generalize the per-stage parallelism configuration, we should
>     have a FR proposing the capability to explicitly set autoscaling
>     (in this case, fixed size per stage) policy in Beam pipelines.
>
>     Per-step or per-stage parallelism, or fusion/optimization is not
>     part of the Beam model. They are [Flink] runner implementation
>     details and should be configured for each runner.
>
>     Also, when building the pipeline, it's not clear what the fusion
>     looks like until the pipeline is submitted to a runner, thus
>     making configuration of the parallelism/worker-per-stage not
>     straightforward.
>     Flink's parallelism settings can be found here
>     <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
>     it's still kind of a black box since you don't really know how
>     many tasks are actually spawned until you run a pipeline.
>
>     That being said, if we have a general interface controlling how a
>     pipeline scales, each runner could adapt [auto]scaling in their
>     own way.
>     For example, in a Flink job, each operator/stage's task slot is
>     prorated by their key numbers; the maximum parallelism is
>     throttled by task slot utilization.
>     Another example, in a Dataflow job, each stage horizontally scales
>     by CPU utilization; vertically scales by memory/disk utilization.
>
>     +dev@beam.apache.org <ma...@beam.apache.org>
>     Let's use this thread to discuss how to configure a pipeline for
>     runners so that they can scale workers appropriately without
>     exposing runner-specific details to the Beam model.
>
>     Ning.
>
>
>     On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>         Hi Ning,
>
>         I might have missed that in the discussion, but we talk about
>         batch execution, am I right? In streaming, all operators
>         (PTransforms) of a Pipeline are run in the same slots, thus
>         the downsides are limited. You can enforce streaming mode
>         using --streaming command-line argument. But yes, this might
>         have other implications. For batch only it obviously makes
>         sense to limit parallelism of a (fused) 'stage', which is not
>         an transform-level concept, but rather a more complex union of
>         transforms divided by shuffle barrier. Would you be willing to
>         start a follow-up thread in @dev mailing list for this for
>         deeper discussion?
>
>          Jan
>
>         On 4/20/23 19:18, Ning Kang via user wrote:
>>         Hi Jan,
>>
>>         The approach works when your pipeline doesn't have too many
>>         operators. And the operator that needs the highest
>>         parallelism can only use at most #total_task_slots /
>>         #operators resources available in the cluster.
>>
>>         Another downside is wasted resources for other smaller
>>         operators who cannot make full use of task slots assigned to
>>         them. You might see only 1/10 tasks running while the other
>>         9/10 tasks idle for an operator with parallelism 10,
>>         especially when it's doing some aggregation like a SUM.
>>
>>         One redeeming method is that, for operators following another
>>         operator with high fanout, we can explicitly add a Reshuffle
>>         to allow a higher parallelism. But this circles back to the
>>         first downside: if your pipeline has exponentially high
>>         fanout through it, setting a single parallelism for the whole
>>         pipeline is not ideal because it limits the scalability of
>>         your pipeline significantly.
>>
>>         Ning.
>>
>>
>>         On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský
>>         <je...@seznam.cz> wrote:
>>
>>             Hi,
>>
>>             this topic was discussed many years ago and the
>>             conclusion there was that setting the parallelism of
>>             individual operators via FlinkPipelineOptions (or
>>             ResourceHints) is be possible, but would be somewhat
>>             cumbersome. Although I understand that it "feels" weird
>>             to have high parallelism for operators with small inputs,
>>             does this actually bring any relevant performance impact?
>>             I always use parallelism based on the largest operator in
>>             the Pipeline and this seems to work just fine. Is there
>>             any particular need or measurable impact of such approach?
>>
>>              Jan
>>
>>             On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>             Same need here, using Flink runner. We are processing a
>>>             pcollection (extracting features per element) then
>>>             combining these into groups of features and running the
>>>             next operator on those groups.
>>>
>>>             Each group contains ~50 elements, so the parallelism of
>>>             the operator upstream of the groupby should be higher,
>>>             to be balanced with the downstream operator.
>>>
>>>             On Tue, Apr 18, 2023 at 19:17 Jeff Zhang
>>>             <zj...@gmail.com> wrote:
>>>
>>>                 Hi Reuven,
>>>
>>>                 It would be better to set parallelism for operators,
>>>                 as I mentioned before, there may be multiple
>>>                 groupby, join operators in one pipeline, and their
>>>                 parallelism can be different due to different input
>>>                 data sizes.
>>>
>>>                 On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax
>>>                 <re...@google.com> wrote:
>>>
>>>                     Jeff - does setting the global default work for
>>>                     you, or do you need per-operator control? Seems
>>>                     like it would be to add this to ResourceHints.
>>>
>>>                     On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
>>>                     <ro...@google.com> wrote:
>>>
>>>                         Yeah, I don't think we have a good
>>>                         per-operator API for this. If we were to add
>>>                         it, it probably belongs in ResourceHints.
>>>
>>>                         On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
>>>                         <re...@google.com> wrote:
>>>
>>>                             Looking at FlinkPipelineOptions, there
>>>                             is a parallelism option you can set. I
>>>                             believe this sets the default
>>>                             parallelism for all Flink operators.
>>>
>>>                             On Sun, Apr 16, 2023 at 7:20 PM Jeff
>>>                             Zhang <zj...@gmail.com> wrote:
>>>
>>>                                 Thanks Holden, this would work for
>>>                                 Spark, but Flink doesn't have such
>>>                                 kind of mechanism, so I am looking
>>>                                 for a general solution on the beam side.
>>>
>>>                                 On Mon, Apr 17, 2023 at 10:08 AM
>>>                                 Holden Karau <ho...@pigscanfly.ca>
>>>                                 wrote:
>>>
>>>                                     To a (small) degree Sparks “new”
>>>                                     AQE might be able to help
>>>                                     depending on what kind of
>>>                                     operations Beam is compiling it
>>>                                     down to.
>>>
>>>                                     Have you tried setting
>>>                                     spark.sql.adaptive.enabled &
>>>                                     spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>>                                     On Mon, Apr 17, 2023 at 10:34 AM
>>>                                     Reuven Lax via user
>>>                                     <us...@beam.apache.org> wrote:
>>>
>>>                                         I see. Robert - what is the
>>>                                         story for parallelism
>>>                                         controls on GBK with the
>>>                                         Spark or Flink runners?
>>>
>>>                                         On Sun, Apr 16, 2023 at
>>>                                         6:24 PM Jeff Zhang
>>>                                         <zj...@gmail.com> wrote:
>>>
>>>                                             No, I don't use
>>>                                             dataflow, I use Spark &
>>>                                             Flink.
>>>
>>>
>>>                                             On Mon, Apr 17, 2023 at
>>>                                             8:08 AM Reuven Lax
>>>                                             <re...@google.com> wrote:
>>>
>>>                                                 Are you running on
>>>                                                 the Dataflow runner?
>>>                                                 If so, Dataflow -
>>>                                                 unlike Spark and
>>>                                                 Flink - dynamically
>>>                                                 modifies the
>>>                                                 parallelism as the
>>>                                                 operator runs, so
>>>                                                 there is no need to
>>>                                                 have such controls.
>>>                                                 In fact these
>>>                                                 specific controls
>>>                                                 wouldn't make much
>>>                                                 sense for the way
>>>                                                 Dataflow implements
>>>                                                 these operators.
>>>
>>>                                                 On Sun, Apr 16, 2023
>>>                                                 at 12:25 AM Jeff
>>>                                                 Zhang
>>>                                                 <zj...@gmail.com>
>>>                                                 wrote:
>>>
>>>                                                     Just for
>>>                                                     performance tuning like
>>>                                                     in Spark and Flink.
>>>
>>>
>>>                                                     On Sun, Apr 16,
>>>                                                     2023 at 1:10 PM
>>>                                                     Robert Bradshaw
>>>                                                     via user
>>>                                                     <us...@beam.apache.org>
>>>                                                     wrote:
>>>
>>>                                                         What are you
>>>                                                         trying to
>>>                                                         achieve by
>>>                                                         setting the
>>>                                                         parallelism?
>>>
>>>                                                         On Sat, Apr
>>>                                                         15, 2023 at
>>>                                                         5:13 PM Jeff
>>>                                                         Zhang
>>>                                                         <zj...@gmail.com>
>>>                                                         wrote:
>>>
>>>                                                             Thanks
>>>                                                             Reuven,
>>>                                                             what I
>>>                                                             mean is
>>>                                                             to set
>>>                                                             the
>>>                                                             parallelism
>>>                                                             in
>>>                                                             operator
>>>                                                             level.
>>>                                                             And the
>>>                                                             input
>>>                                                             size of
>>>                                                             the
>>>                                                             operator
>>>                                                             is
>>>                                                             unknown
>>>                                                             at
>>>                                                             compiling
>>>                                                             stage if
>>>                                                             it is
>>>                                                             not a
>>>                                                             source
>>>                                                              operator,
>>>
>>>                                                             Here's
>>>                                                             an
>>>                                                             example
>>>                                                             of flink
>>>                                                             https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>                                                             Spark also
>>>                                                             support
>>>                                                             to set
>>>                                                             operator
>>>                                                             level
>>>                                                             parallelism
>>>                                                             (see
>>>                                                             groupByKey
>>>                                                             and
>>>                                                             reduceByKey):
>>>                                                             https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>
>>>
>>>                                                             On Sun,
>>>                                                             Apr 16,
>>>                                                             2023 at
>>>                                                             1:42 AM
>>>                                                             Reuven
>>>                                                             Lax via
>>>                                                             user
>>>                                                             <us...@beam.apache.org>
>>>                                                             wrote:
>>>
>>>                                                                 The
>>>                                                                 maximum
>>>                                                                 parallelism is
>>>                                                                 always
>>>                                                                 determined
>>>                                                                 by
>>>                                                                 the
>>>                                                                 parallelism
>>>                                                                 of
>>>                                                                 your
>>>                                                                 data.
>>>                                                                 If
>>>                                                                 you
>>>                                                                 do a
>>>                                                                 GroupByKey
>>>                                                                 for
>>>                                                                 example,
>>>                                                                 the
>>>                                                                 number
>>>                                                                 of
>>>                                                                 keys
>>>                                                                 in
>>>                                                                 your
>>>                                                                 data
>>>                                                                 determines
>>>                                                                 the
>>>                                                                 maximum
>>>                                                                 parallelism.
>>>
>>>
>>>                                                                 Beyond
>>>                                                                 the
>>>                                                                 limitations
>>>                                                                 in
>>>                                                                 your
>>>                                                                 data,
>>>                                                                 it
>>>                                                                 depends
>>>                                                                 on
>>>                                                                 your
>>>                                                                 execution
>>>                                                                 engine.
>>>                                                                 If
>>>                                                                 you're
>>>                                                                 using
>>>                                                                 Dataflow,
>>>                                                                 Dataflow
>>>                                                                 is
>>>                                                                 designed
>>>                                                                 to
>>>                                                                 automatically
>>>                                                                 determine
>>>                                                                 the
>>>                                                                 parallelism
>>>                                                                 (e.g.
>>>                                                                 work
>>>                                                                 will
>>>                                                                 be
>>>                                                                 dynamically
>>>                                                                 split
>>>                                                                 and
>>>                                                                 moved
>>>                                                                 around
>>>                                                                 between
>>>                                                                 workers,
>>>                                                                 the
>>>                                                                 number
>>>                                                                 of
>>>                                                                 workers
>>>                                                                 will
>>>                                                                 autoscale,
>>>                                                                 etc.),
>>>                                                                 so
>>>                                                                 there's
>>>                                                                 no
>>>                                                                 need
>>>                                                                 to
>>>                                                                 explicitly
>>>                                                                 set
>>>                                                                 the
>>>                                                                 parallelism
>>>                                                                 of
>>>                                                                 the
>>>                                                                 execution.
>>>
>>>                                                                 On
>>>                                                                 Sat,
>>>                                                                 Apr
>>>                                                                 15,
>>>                                                                 2023
>>>                                                                 at
>>>                                                                 1:12 AM
>>>                                                                 Jeff
>>>                                                                 Zhang
>>>                                                                 <zj...@gmail.com>
>>>                                                                 wrote:
>>>
>>>                                                                     Besides
>>>                                                                     the
>>>                                                                     global
>>>                                                                     parallelism of
>>>                                                                     beam
>>>                                                                     job,
>>>                                                                     is
>>>                                                                     there
>>>                                                                     any
>>>                                                                     way
>>>                                                                     to
>>>                                                                     set
>>>                                                                     parallelism for
>>>                                                                     individual
>>>                                                                     operators
>>>                                                                     like
>>>                                                                     group
>>>                                                                     by
>>>                                                                     and
>>>                                                                     join?
>>>                                                                     I
>>>                                                                     understand the
>>>                                                                     parallelism setting
>>>                                                                     depends
>>>                                                                     on
>>>                                                                     the
>>>                                                                     underlying
>>>                                                                     execution
>>>                                                                     engine,
>>>                                                                     but
>>>                                                                     it
>>>                                                                     is
>>>                                                                     very
>>>                                                                     common
>>>                                                                     to
>>>                                                                     set
>>>                                                                     parallelism like
>>>                                                                     group
>>>                                                                     by
>>>                                                                     and
>>>                                                                     join
>>>                                                                     in
>>>                                                                     both
>>>                                                                     spark
>>>                                                                     &
>>>                                                                     flink.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>                                                                     -- 
>>>                                                                     Best
>>>                                                                     Regards
>>>
>>>                                                                     Jeff
>>>                                                                     Zhang
>>>
>>>
>>>
>>>                                                             -- 
>>>                                                             Best Regards
>>>
>>>                                                             Jeff Zhang
>>>
>>>
>>>
>>>                                                     -- 
>>>                                                     Best Regards
>>>
>>>                                                     Jeff Zhang
>>>
>>>
>>>
>>>                                             -- 
>>>                                             Best Regards
>>>
>>>                                             Jeff Zhang
>>>
>>>                                     -- 
>>>                                     Twitter:
>>>                                     https://twitter.com/holdenkarau
>>>                                     Books (Learning Spark, High
>>>                                     Performance Spark, etc.):
>>>                                     https://amzn.to/2MaRAG9
>>>                                     <https://amzn.to/2MaRAG9>
>>>                                     YouTube Live Streams:
>>>                                     https://www.youtube.com/user/holdenkarau
>>>
>>>
>>>
>>>                                 -- 
>>>                                 Best Regards
>>>
>>>                                 Jeff Zhang
>>>
>>>
>>>
>>>                 -- 
>>>                 Best Regards
>>>
>>>                 Jeff Zhang
>>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Robert Bradshaw via dev <de...@beam.apache.org>.
+1 to not requiring details like this in the Beam model. There is, however,
the question of how to pass such implementation-detail specific hints to a
runner that requires them. Generally that's done via ResourceHints or
annotations, and while the former seems a good fit it's primarily focused
on setting up the right context for user code (which GBK is not).

A complete hack is to add an experiment like
flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
cleaner.


On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <us...@beam.apache.org>
wrote:

> Hi Jan,
>
> To generalize the per-stage parallelism configuration, we should have a FR
> proposing the capability to explicitly set autoscaling (in this case, fixed
> size per stage) policy in Beam pipelines.
>
> Per-step or per-stage parallelism, or fusion/optimization is not part of
> the Beam model. They are [Flink] runner implementation details and should
> be configured for each runner.
>
> Also, when building the pipeline, it's not clear what the fusion looks
> like until the pipeline is submitted to a runner, thus making configuration
> of the parallelism/worker-per-stage not straightforward.
> Flink's parallelism settings can be found here
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
> it's still kind of a black box since you don't really know how many tasks
> are actually spawned until you run a pipeline.
>
> That being said, if we have a general interface controlling how a pipeline
> scales, each runner could adapt [auto]scaling in their own way.
> For example, in a Flink job, each operator/stage's task slot is prorated
> by their key numbers; the maximum parallelism is throttled by task slot
> utilization.
> Another example, in a Dataflow job, each stage horizontally scales by CPU
> utilization; vertically scales by memory/disk utilization.
>
> +dev@beam.apache.org <de...@beam.apache.org>
> Let's use this thread to discuss how to configure a pipeline for runners
> so that they can scale workers appropriately without exposing
> runner-specific details to the Beam model.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Ning,
>>
>> I might have missed that in the discussion, but we talk about batch
>> execution, am I right? In streaming, all operators (PTransforms) of a
>> Pipeline are run in the same slots, thus the downsides are limited. You can
>> enforce streaming mode using --streaming command-line argument. But yes,
>> this might have other implications. For batch only it obviously makes sense
>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>> concept, but rather a more complex union of transforms divided by shuffle
>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>> list for this for deeper discussion?
>>
>>  Jan
>> On 4/20/23 19:18, Ning Kang via user wrote:
>>
>> Hi Jan,
>>
>> The approach works when your pipeline doesn't have too many operators.
>> And the operator that needs the highest parallelism can only use at most
>> #total_task_slots / #operators resources available in the cluster.
>>
>> Another downside is wasted resources for other smaller operators who
>> cannot make full use of task slots assigned to them. You might see only
>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>
>> One redeeming method is that, for operators following another operator
>> with high fanout, we can explicitly add a Reshuffle to allow a higher
>> parallelism. But this circles back to the first downside: if your pipeline
>> has exponentially high fanout through it, setting a single parallelism for
>> the whole pipeline is not ideal because it limits the scalability of your
>> pipeline significantly.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> this topic was discussed many years ago and the conclusion there was
>>> that setting the parallelism of individual operators via
>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be
>>> somewhat cumbersome. Although I understand that it "feels" weird to have
>>> high parallelism for operators with small inputs, does this actually bring
>>> any relevant performance impact? I always use parallelism based on the
>>> largest operator in the Pipeline and this seems to work just fine. Is there
>>> any particular need or measurable impact of such approach?
>>>
>>>  Jan
>>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>
>>> Same need here, using Flink runner. We are processing a pcollection
>>> (extracting features per element) then combining these into groups of
>>> features and running the next operator on those groups.
>>>
>>> Each group contains ~50 elements, so the parallelism of the operator
>>> upstream of the groupby should be higher, to be balanced with the
>>> downstream operator.
>>>
>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> It would be better to set parallelism for operators, as I mentioned
>>>> before, there may be multiple groupby, join operators in one pipeline, and
>>>> their parallelism can be different due to different input data sizes.
>>>>
>>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Jeff - does setting the global default work for you, or do you need
>>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>>
>>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>>> operators.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>>> such kind of mechanism, so I am looking for a general solution on the beam
>>>>>>>> side.
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>>
>>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow -
>>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as the
>>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact these
>>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow implements
>>>>>>>>>>>> these operators.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <
>>>>>>>>>>>>>>>> zjffdu@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>