You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jeff Zhang <zj...@gmail.com> on 2023/04/15 08:12:04 UTC

Is there any way to set the parallelism of operators like group by, join?

Besides the global parallelism of beam job, is there any way to set
parallelism for individual operators like group by and join? I
understand the parallelism setting depends on the underlying execution
engine, but it is very common to set parallelism like group by and join in
both spark & flink.







-- 
Best Regards

Jeff Zhang

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Kenneth Knowles <ke...@apache.org>.
FWIW I think parallelism is close enough to a resource. If you phrased it
like "how many CPUs can work independently" it is more closely related to
resources. Just like how many bits it takes to encode something is a
semantic property, but "RAM" is a resource.

I think a big role of resource hints is to be a bridge between the Beam
Model, which tries hard to only include essential information, to a
particular implementation which may not be able to autotune various
inessential/implementation details. Specifying parallelism to a runner that
still requires manual tuning of that seems like a fine use of this.

Kenn

On Fri, Apr 21, 2023 at 11:30 AM Jan Lukavský <je...@seznam.cz> wrote:

> Absolutely agree this is not something that should be part of the model.
> The ResourceHints is good place, but given how Pipeline might get fused
> (and though this might be under the control of a runner, basically all
> runners use the same code, because there is currently no reason why this
> should be runner-specifiic), there is a problem with how to resolve
> conflicting settings. Also it is somewhat questionable if parallelism is a
> "resource". It feels more like a runtime property. I tend to think that
> FlinkPipelineOptions could be a good place for that, because this seems to
> apply (mostly) to Flink batch runner.
> On 4/21/23 19:43, Robert Bradshaw via dev wrote:
>
> +1 to not requiring details like this in the Beam model. There is,
> however, the question of how to pass such implementation-detail specific
> hints to a runner that requires them. Generally that's done via
> ResourceHints or annotations, and while the former seems a good fit it's
> primarily focused on setting up the right context for user code (which GBK
> is not).
>
> A complete hack is to add an experiment like
> flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
> cleaner.
>
>
> On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <us...@beam.apache.org>
> wrote:
>
>> Hi Jan,
>>
>> To generalize the per-stage parallelism configuration, we should have a
>> FR proposing the capability to explicitly set autoscaling (in this case,
>> fixed size per stage) policy in Beam pipelines.
>>
>> Per-step or per-stage parallelism, or fusion/optimization is not part of
>> the Beam model. They are [Flink] runner implementation details and should
>> be configured for each runner.
>>
>> Also, when building the pipeline, it's not clear what the fusion looks
>> like until the pipeline is submitted to a runner, thus making configuration
>> of the parallelism/worker-per-stage not straightforward.
>> Flink's parallelism settings can be found here
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
>> it's still kind of a black box since you don't really know how many tasks
>> are actually spawned until you run a pipeline.
>>
>> That being said, if we have a general interface controlling how a
>> pipeline scales, each runner could adapt [auto]scaling in their own way.
>> For example, in a Flink job, each operator/stage's task slot is prorated
>> by their key numbers; the maximum parallelism is throttled by task slot
>> utilization.
>> Another example, in a Dataflow job, each stage horizontally scales by CPU
>> utilization; vertically scales by memory/disk utilization.
>>
>> +dev@beam.apache.org <de...@beam.apache.org>
>> Let's use this thread to discuss how to configure a pipeline for runners
>> so that they can scale workers appropriately without exposing
>> runner-specific details to the Beam model.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Ning,
>>>
>>> I might have missed that in the discussion, but we talk about batch
>>> execution, am I right? In streaming, all operators (PTransforms) of a
>>> Pipeline are run in the same slots, thus the downsides are limited. You can
>>> enforce streaming mode using --streaming command-line argument. But yes,
>>> this might have other implications. For batch only it obviously makes sense
>>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>>> concept, but rather a more complex union of transforms divided by shuffle
>>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>>> list for this for deeper discussion?
>>>
>>>  Jan
>>> On 4/20/23 19:18, Ning Kang via user wrote:
>>>
>>> Hi Jan,
>>>
>>> The approach works when your pipeline doesn't have too many operators.
>>> And the operator that needs the highest parallelism can only use at most
>>> #total_task_slots / #operators resources available in the cluster.
>>>
>>> Another downside is wasted resources for other smaller operators who
>>> cannot make full use of task slots assigned to them. You might see only
>>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>>
>>> One redeeming method is that, for operators following another operator
>>> with high fanout, we can explicitly add a Reshuffle to allow a higher
>>> parallelism. But this circles back to the first downside: if your pipeline
>>> has exponentially high fanout through it, setting a single parallelism for
>>> the whole pipeline is not ideal because it limits the scalability of your
>>> pipeline significantly.
>>>
>>> Ning.
>>>
>>>
>>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> this topic was discussed many years ago and the conclusion there was
>>>> that setting the parallelism of individual operators via
>>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be
>>>> somewhat cumbersome. Although I understand that it "feels" weird to have
>>>> high parallelism for operators with small inputs, does this actually bring
>>>> any relevant performance impact? I always use parallelism based on the
>>>> largest operator in the Pipeline and this seems to work just fine. Is there
>>>> any particular need or measurable impact of such approach?
>>>>
>>>>  Jan
>>>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>>
>>>> Same need here, using Flink runner. We are processing a pcollection
>>>> (extracting features per element) then combining these into groups of
>>>> features and running the next operator on those groups.
>>>>
>>>> Each group contains ~50 elements, so the parallelism of the operator
>>>> upstream of the groupby should be higher, to be balanced with the
>>>> downstream operator.
>>>>
>>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>>>
>>>>> Hi Reuven,
>>>>>
>>>>> It would be better to set parallelism for operators, as I mentioned
>>>>> before, there may be multiple groupby, join operators in one pipeline, and
>>>>> their parallelism can be different due to different input data sizes.
>>>>>
>>>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Jeff - does setting the global default work for you, or do you need
>>>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>>>
>>>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>>>> operators.
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>>>> such kind of mechanism, so I am looking for a general solution on the beam
>>>>>>>>> side.
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <
>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>>>
>>>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow -
>>>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as the
>>>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact these
>>>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow implements
>>>>>>>>>>>>> these operators.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <
>>>>>>>>>>>>>>>>> zjffdu@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jan Lukavský <je...@seznam.cz>.
Absolutely agree this is not something that should be part of the model. 
The ResourceHints is good place, but given how Pipeline might get fused 
(and though this might be under the control of a runner, basically all 
runners use the same code, because there is currently no reason why this 
should be runner-specifiic), there is a problem with how to resolve 
conflicting settings. Also it is somewhat questionable if parallelism is 
a "resource". It feels more like a runtime property. I tend to think 
that FlinkPipelineOptions could be a good place for that, because this 
seems to apply (mostly) to Flink batch runner.

On 4/21/23 19:43, Robert Bradshaw via dev wrote:
> +1 to not requiring details like this in the Beam model. There is, 
> however, the question of how to pass such implementation-detail 
> specific hints to a runner that requires them. Generally that's done 
> via ResourceHints or annotations, and while the former seems a good 
> fit it's primarily focused on setting up the right context for user 
> code (which GBK is not).
>
> A complete hack is to add an experiment like 
> flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do 
> something cleaner.
>
>
> On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user 
> <us...@beam.apache.org> wrote:
>
>     Hi Jan,
>
>     To generalize the per-stage parallelism configuration, we should
>     have a FR proposing the capability to explicitly set autoscaling
>     (in this case, fixed size per stage) policy in Beam pipelines.
>
>     Per-step or per-stage parallelism, or fusion/optimization is not
>     part of the Beam model. They are [Flink] runner implementation
>     details and should be configured for each runner.
>
>     Also, when building the pipeline, it's not clear what the fusion
>     looks like until the pipeline is submitted to a runner, thus
>     making configuration of the parallelism/worker-per-stage not
>     straightforward.
>     Flink's parallelism settings can be found here
>     <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
>     it's still kind of a black box since you don't really know how
>     many tasks are actually spawned until you run a pipeline.
>
>     That being said, if we have a general interface controlling how a
>     pipeline scales, each runner could adapt [auto]scaling in their
>     own way.
>     For example, in a Flink job, each operator/stage's task slot is
>     prorated by their key numbers; the maximum parallelism is
>     throttled by task slot utilization.
>     Another example, in a Dataflow job, each stage horizontally scales
>     by CPU utilization; vertically scales by memory/disk utilization.
>
>     +dev@beam.apache.org <ma...@beam.apache.org>
>     Let's use this thread to discuss how to configure a pipeline for
>     runners so that they can scale workers appropriately without
>     exposing runner-specific details to the Beam model.
>
>     Ning.
>
>
>     On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>         Hi Ning,
>
>         I might have missed that in the discussion, but we talk about
>         batch execution, am I right? In streaming, all operators
>         (PTransforms) of a Pipeline are run in the same slots, thus
>         the downsides are limited. You can enforce streaming mode
>         using --streaming command-line argument. But yes, this might
>         have other implications. For batch only it obviously makes
>         sense to limit parallelism of a (fused) 'stage', which is not
>         an transform-level concept, but rather a more complex union of
>         transforms divided by shuffle barrier. Would you be willing to
>         start a follow-up thread in @dev mailing list for this for
>         deeper discussion?
>
>          Jan
>
>         On 4/20/23 19:18, Ning Kang via user wrote:
>>         Hi Jan,
>>
>>         The approach works when your pipeline doesn't have too many
>>         operators. And the operator that needs the highest
>>         parallelism can only use at most #total_task_slots /
>>         #operators resources available in the cluster.
>>
>>         Another downside is wasted resources for other smaller
>>         operators who cannot make full use of task slots assigned to
>>         them. You might see only 1/10 tasks running while the other
>>         9/10 tasks idle for an operator with parallelism 10,
>>         especially when it's doing some aggregation like a SUM.
>>
>>         One redeeming method is that, for operators following another
>>         operator with high fanout, we can explicitly add a Reshuffle
>>         to allow a higher parallelism. But this circles back to the
>>         first downside: if your pipeline has exponentially high
>>         fanout through it, setting a single parallelism for the whole
>>         pipeline is not ideal because it limits the scalability of
>>         your pipeline significantly.
>>
>>         Ning.
>>
>>
>>         On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský
>>         <je...@seznam.cz> wrote:
>>
>>             Hi,
>>
>>             this topic was discussed many years ago and the
>>             conclusion there was that setting the parallelism of
>>             individual operators via FlinkPipelineOptions (or
>>             ResourceHints) is be possible, but would be somewhat
>>             cumbersome. Although I understand that it "feels" weird
>>             to have high parallelism for operators with small inputs,
>>             does this actually bring any relevant performance impact?
>>             I always use parallelism based on the largest operator in
>>             the Pipeline and this seems to work just fine. Is there
>>             any particular need or measurable impact of such approach?
>>
>>              Jan
>>
>>             On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>             Same need here, using Flink runner. We are processing a
>>>             pcollection (extracting features per element) then
>>>             combining these into groups of features and running the
>>>             next operator on those groups.
>>>
>>>             Each group contains ~50 elements, so the parallelism of
>>>             the operator upstream of the groupby should be higher,
>>>             to be balanced with the downstream operator.
>>>
>>>             On Tue, Apr 18, 2023 at 19:17 Jeff Zhang
>>>             <zj...@gmail.com> wrote:
>>>
>>>                 Hi Reuven,
>>>
>>>                 It would be better to set parallelism for operators,
>>>                 as I mentioned before, there may be multiple
>>>                 groupby, join operators in one pipeline, and their
>>>                 parallelism can be different due to different input
>>>                 data sizes.
>>>
>>>                 On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax
>>>                 <re...@google.com> wrote:
>>>
>>>                     Jeff - does setting the global default work for
>>>                     you, or do you need per-operator control? Seems
>>>                     like it would be to add this to ResourceHints.
>>>
>>>                     On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
>>>                     <ro...@google.com> wrote:
>>>
>>>                         Yeah, I don't think we have a good
>>>                         per-operator API for this. If we were to add
>>>                         it, it probably belongs in ResourceHints.
>>>
>>>                         On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
>>>                         <re...@google.com> wrote:
>>>
>>>                             Looking at FlinkPipelineOptions, there
>>>                             is a parallelism option you can set. I
>>>                             believe this sets the default
>>>                             parallelism for all Flink operators.
>>>
>>>                             On Sun, Apr 16, 2023 at 7:20 PM Jeff
>>>                             Zhang <zj...@gmail.com> wrote:
>>>
>>>                                 Thanks Holden, this would work for
>>>                                 Spark, but Flink doesn't have such
>>>                                 kind of mechanism, so I am looking
>>>                                 for a general solution on the beam side.
>>>
>>>                                 On Mon, Apr 17, 2023 at 10:08 AM
>>>                                 Holden Karau <ho...@pigscanfly.ca>
>>>                                 wrote:
>>>
>>>                                     To a (small) degree Sparks “new”
>>>                                     AQE might be able to help
>>>                                     depending on what kind of
>>>                                     operations Beam is compiling it
>>>                                     down to.
>>>
>>>                                     Have you tried setting
>>>                                     spark.sql.adaptive.enabled &
>>>                                     spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>>                                     On Mon, Apr 17, 2023 at 10:34 AM
>>>                                     Reuven Lax via user
>>>                                     <us...@beam.apache.org> wrote:
>>>
>>>                                         I see. Robert - what is the
>>>                                         story for parallelism
>>>                                         controls on GBK with the
>>>                                         Spark or Flink runners?
>>>
>>>                                         On Sun, Apr 16, 2023 at
>>>                                         6:24 PM Jeff Zhang
>>>                                         <zj...@gmail.com> wrote:
>>>
>>>                                             No, I don't use
>>>                                             dataflow, I use Spark &
>>>                                             Flink.
>>>
>>>
>>>                                             On Mon, Apr 17, 2023 at
>>>                                             8:08 AM Reuven Lax
>>>                                             <re...@google.com> wrote:
>>>
>>>                                                 Are you running on
>>>                                                 the Dataflow runner?
>>>                                                 If so, Dataflow -
>>>                                                 unlike Spark and
>>>                                                 Flink - dynamically
>>>                                                 modifies the
>>>                                                 parallelism as the
>>>                                                 operator runs, so
>>>                                                 there is no need to
>>>                                                 have such controls.
>>>                                                 In fact these
>>>                                                 specific controls
>>>                                                 wouldn't make much
>>>                                                 sense for the way
>>>                                                 Dataflow implements
>>>                                                 these operators.
>>>
>>>                                                 On Sun, Apr 16, 2023
>>>                                                 at 12:25 AM Jeff
>>>                                                 Zhang
>>>                                                 <zj...@gmail.com>
>>>                                                 wrote:
>>>
>>>                                                     Just for
>>>                                                     performance tuning like
>>>                                                     in Spark and Flink.
>>>
>>>
>>>                                                     On Sun, Apr 16,
>>>                                                     2023 at 1:10 PM
>>>                                                     Robert Bradshaw
>>>                                                     via user
>>>                                                     <us...@beam.apache.org>
>>>                                                     wrote:
>>>
>>>                                                         What are you
>>>                                                         trying to
>>>                                                         achieve by
>>>                                                         setting the
>>>                                                         parallelism?
>>>
>>>                                                         On Sat, Apr
>>>                                                         15, 2023 at
>>>                                                         5:13 PM Jeff
>>>                                                         Zhang
>>>                                                         <zj...@gmail.com>
>>>                                                         wrote:
>>>
>>>                                                             Thanks
>>>                                                             Reuven,
>>>                                                             what I
>>>                                                             mean is
>>>                                                             to set
>>>                                                             the
>>>                                                             parallelism
>>>                                                             in
>>>                                                             operator
>>>                                                             level.
>>>                                                             And the
>>>                                                             input
>>>                                                             size of
>>>                                                             the
>>>                                                             operator
>>>                                                             is
>>>                                                             unknown
>>>                                                             at
>>>                                                             compiling
>>>                                                             stage if
>>>                                                             it is
>>>                                                             not a
>>>                                                             source
>>>                                                              operator,
>>>
>>>                                                             Here's
>>>                                                             an
>>>                                                             example
>>>                                                             of flink
>>>                                                             https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>                                                             Spark also
>>>                                                             support
>>>                                                             to set
>>>                                                             operator
>>>                                                             level
>>>                                                             parallelism
>>>                                                             (see
>>>                                                             groupByKey
>>>                                                             and
>>>                                                             reduceByKey):
>>>                                                             https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>
>>>
>>>                                                             On Sun,
>>>                                                             Apr 16,
>>>                                                             2023 at
>>>                                                             1:42 AM
>>>                                                             Reuven
>>>                                                             Lax via
>>>                                                             user
>>>                                                             <us...@beam.apache.org>
>>>                                                             wrote:
>>>
>>>                                                                 The
>>>                                                                 maximum
>>>                                                                 parallelism is
>>>                                                                 always
>>>                                                                 determined
>>>                                                                 by
>>>                                                                 the
>>>                                                                 parallelism
>>>                                                                 of
>>>                                                                 your
>>>                                                                 data.
>>>                                                                 If
>>>                                                                 you
>>>                                                                 do a
>>>                                                                 GroupByKey
>>>                                                                 for
>>>                                                                 example,
>>>                                                                 the
>>>                                                                 number
>>>                                                                 of
>>>                                                                 keys
>>>                                                                 in
>>>                                                                 your
>>>                                                                 data
>>>                                                                 determines
>>>                                                                 the
>>>                                                                 maximum
>>>                                                                 parallelism.
>>>
>>>
>>>                                                                 Beyond
>>>                                                                 the
>>>                                                                 limitations
>>>                                                                 in
>>>                                                                 your
>>>                                                                 data,
>>>                                                                 it
>>>                                                                 depends
>>>                                                                 on
>>>                                                                 your
>>>                                                                 execution
>>>                                                                 engine.
>>>                                                                 If
>>>                                                                 you're
>>>                                                                 using
>>>                                                                 Dataflow,
>>>                                                                 Dataflow
>>>                                                                 is
>>>                                                                 designed
>>>                                                                 to
>>>                                                                 automatically
>>>                                                                 determine
>>>                                                                 the
>>>                                                                 parallelism
>>>                                                                 (e.g.
>>>                                                                 work
>>>                                                                 will
>>>                                                                 be
>>>                                                                 dynamically
>>>                                                                 split
>>>                                                                 and
>>>                                                                 moved
>>>                                                                 around
>>>                                                                 between
>>>                                                                 workers,
>>>                                                                 the
>>>                                                                 number
>>>                                                                 of
>>>                                                                 workers
>>>                                                                 will
>>>                                                                 autoscale,
>>>                                                                 etc.),
>>>                                                                 so
>>>                                                                 there's
>>>                                                                 no
>>>                                                                 need
>>>                                                                 to
>>>                                                                 explicitly
>>>                                                                 set
>>>                                                                 the
>>>                                                                 parallelism
>>>                                                                 of
>>>                                                                 the
>>>                                                                 execution.
>>>
>>>                                                                 On
>>>                                                                 Sat,
>>>                                                                 Apr
>>>                                                                 15,
>>>                                                                 2023
>>>                                                                 at
>>>                                                                 1:12 AM
>>>                                                                 Jeff
>>>                                                                 Zhang
>>>                                                                 <zj...@gmail.com>
>>>                                                                 wrote:
>>>
>>>                                                                     Besides
>>>                                                                     the
>>>                                                                     global
>>>                                                                     parallelism of
>>>                                                                     beam
>>>                                                                     job,
>>>                                                                     is
>>>                                                                     there
>>>                                                                     any
>>>                                                                     way
>>>                                                                     to
>>>                                                                     set
>>>                                                                     parallelism for
>>>                                                                     individual
>>>                                                                     operators
>>>                                                                     like
>>>                                                                     group
>>>                                                                     by
>>>                                                                     and
>>>                                                                     join?
>>>                                                                     I
>>>                                                                     understand the
>>>                                                                     parallelism setting
>>>                                                                     depends
>>>                                                                     on
>>>                                                                     the
>>>                                                                     underlying
>>>                                                                     execution
>>>                                                                     engine,
>>>                                                                     but
>>>                                                                     it
>>>                                                                     is
>>>                                                                     very
>>>                                                                     common
>>>                                                                     to
>>>                                                                     set
>>>                                                                     parallelism like
>>>                                                                     group
>>>                                                                     by
>>>                                                                     and
>>>                                                                     join
>>>                                                                     in
>>>                                                                     both
>>>                                                                     spark
>>>                                                                     &
>>>                                                                     flink.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>                                                                     -- 
>>>                                                                     Best
>>>                                                                     Regards
>>>
>>>                                                                     Jeff
>>>                                                                     Zhang
>>>
>>>
>>>
>>>                                                             -- 
>>>                                                             Best Regards
>>>
>>>                                                             Jeff Zhang
>>>
>>>
>>>
>>>                                                     -- 
>>>                                                     Best Regards
>>>
>>>                                                     Jeff Zhang
>>>
>>>
>>>
>>>                                             -- 
>>>                                             Best Regards
>>>
>>>                                             Jeff Zhang
>>>
>>>                                     -- 
>>>                                     Twitter:
>>>                                     https://twitter.com/holdenkarau
>>>                                     Books (Learning Spark, High
>>>                                     Performance Spark, etc.):
>>>                                     https://amzn.to/2MaRAG9
>>>                                     <https://amzn.to/2MaRAG9>
>>>                                     YouTube Live Streams:
>>>                                     https://www.youtube.com/user/holdenkarau
>>>
>>>
>>>
>>>                                 -- 
>>>                                 Best Regards
>>>
>>>                                 Jeff Zhang
>>>
>>>
>>>
>>>                 -- 
>>>                 Best Regards
>>>
>>>                 Jeff Zhang
>>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Robert Bradshaw via dev <de...@beam.apache.org>.
+1 to not requiring details like this in the Beam model. There is, however,
the question of how to pass such implementation-detail specific hints to a
runner that requires them. Generally that's done via ResourceHints or
annotations, and while the former seems a good fit it's primarily focused
on setting up the right context for user code (which GBK is not).

A complete hack is to add an experiment like
flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
cleaner.


On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <us...@beam.apache.org>
wrote:

> Hi Jan,
>
> To generalize the per-stage parallelism configuration, we should have a FR
> proposing the capability to explicitly set autoscaling (in this case, fixed
> size per stage) policy in Beam pipelines.
>
> Per-step or per-stage parallelism, or fusion/optimization is not part of
> the Beam model. They are [Flink] runner implementation details and should
> be configured for each runner.
>
> Also, when building the pipeline, it's not clear what the fusion looks
> like until the pipeline is submitted to a runner, thus making configuration
> of the parallelism/worker-per-stage not straightforward.
> Flink's parallelism settings can be found here
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
> it's still kind of a black box since you don't really know how many tasks
> are actually spawned until you run a pipeline.
>
> That being said, if we have a general interface controlling how a pipeline
> scales, each runner could adapt [auto]scaling in their own way.
> For example, in a Flink job, each operator/stage's task slot is prorated
> by their key numbers; the maximum parallelism is throttled by task slot
> utilization.
> Another example, in a Dataflow job, each stage horizontally scales by CPU
> utilization; vertically scales by memory/disk utilization.
>
> +dev@beam.apache.org <de...@beam.apache.org>
> Let's use this thread to discuss how to configure a pipeline for runners
> so that they can scale workers appropriately without exposing
> runner-specific details to the Beam model.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Ning,
>>
>> I might have missed that in the discussion, but we talk about batch
>> execution, am I right? In streaming, all operators (PTransforms) of a
>> Pipeline are run in the same slots, thus the downsides are limited. You can
>> enforce streaming mode using --streaming command-line argument. But yes,
>> this might have other implications. For batch only it obviously makes sense
>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>> concept, but rather a more complex union of transforms divided by shuffle
>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>> list for this for deeper discussion?
>>
>>  Jan
>> On 4/20/23 19:18, Ning Kang via user wrote:
>>
>> Hi Jan,
>>
>> The approach works when your pipeline doesn't have too many operators.
>> And the operator that needs the highest parallelism can only use at most
>> #total_task_slots / #operators resources available in the cluster.
>>
>> Another downside is wasted resources for other smaller operators who
>> cannot make full use of task slots assigned to them. You might see only
>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>
>> One redeeming method is that, for operators following another operator
>> with high fanout, we can explicitly add a Reshuffle to allow a higher
>> parallelism. But this circles back to the first downside: if your pipeline
>> has exponentially high fanout through it, setting a single parallelism for
>> the whole pipeline is not ideal because it limits the scalability of your
>> pipeline significantly.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> this topic was discussed many years ago and the conclusion there was
>>> that setting the parallelism of individual operators via
>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be
>>> somewhat cumbersome. Although I understand that it "feels" weird to have
>>> high parallelism for operators with small inputs, does this actually bring
>>> any relevant performance impact? I always use parallelism based on the
>>> largest operator in the Pipeline and this seems to work just fine. Is there
>>> any particular need or measurable impact of such approach?
>>>
>>>  Jan
>>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>
>>> Same need here, using Flink runner. We are processing a pcollection
>>> (extracting features per element) then combining these into groups of
>>> features and running the next operator on those groups.
>>>
>>> Each group contains ~50 elements, so the parallelism of the operator
>>> upstream of the groupby should be higher, to be balanced with the
>>> downstream operator.
>>>
>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> It would be better to set parallelism for operators, as I mentioned
>>>> before, there may be multiple groupby, join operators in one pipeline, and
>>>> their parallelism can be different due to different input data sizes.
>>>>
>>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Jeff - does setting the global default work for you, or do you need
>>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>>
>>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>>> operators.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>>> such kind of mechanism, so I am looking for a general solution on the beam
>>>>>>>> side.
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>>
>>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow -
>>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as the
>>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact these
>>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow implements
>>>>>>>>>>>> these operators.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <
>>>>>>>>>>>>>>>> zjffdu@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Ning Kang via user <us...@beam.apache.org>.
Hi Jan,

To generalize the per-stage parallelism configuration, we should have a FR
proposing the capability to explicitly set autoscaling (in this case, fixed
size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not part of
the Beam model. They are [Flink] runner implementation details and should
be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion looks like
until the pipeline is submitted to a runner, thus making configuration of
the parallelism/worker-per-stage not straightforward.
Flink's parallelism settings can be found here
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
it's still kind of a black box since you don't really know how many tasks
are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a pipeline
scales, each runner could adapt [auto]scaling in their own way.
For example, in a Flink job, each operator/stage's task slot is prorated by
their key numbers; the maximum parallelism is throttled by task slot
utilization.
Another example, in a Dataflow job, each stage horizontally scales by CPU
utilization; vertically scales by memory/disk utilization.

+dev@beam.apache.org <de...@beam.apache.org>
Let's use this thread to discuss how to configure a pipeline for runners so
that they can scale workers appropriately without exposing runner-specific
details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Ning,
>
> I might have missed that in the discussion, but we talk about batch
> execution, am I right? In streaming, all operators (PTransforms) of a
> Pipeline are run in the same slots, thus the downsides are limited. You can
> enforce streaming mode using --streaming command-line argument. But yes,
> this might have other implications. For batch only it obviously makes sense
> to limit parallelism of a (fused) 'stage', which is not an transform-level
> concept, but rather a more complex union of transforms divided by shuffle
> barrier. Would you be willing to start a follow-up thread in @dev mailing
> list for this for deeper discussion?
>
>  Jan
> On 4/20/23 19:18, Ning Kang via user wrote:
>
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. And
> the operator that needs the highest parallelism can only use at most
> #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who
> cannot make full use of task slots assigned to them. You might see only
> 1/10 tasks running while the other 9/10 tasks idle for an operator with
> parallelism 10, especially when it's doing some aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator
> with high fanout, we can explicitly add a Reshuffle to allow a higher
> parallelism. But this circles back to the first downside: if your pipeline
> has exponentially high fanout through it, setting a single parallelism for
> the whole pipeline is not ideal because it limits the scalability of your
> pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> this topic was discussed many years ago and the conclusion there was that
>> setting the parallelism of individual operators via FlinkPipelineOptions
>> (or ResourceHints) is be possible, but would be somewhat cumbersome.
>> Although I understand that it "feels" weird to have high parallelism for
>> operators with small inputs, does this actually bring any relevant
>> performance impact? I always use parallelism based on the largest operator
>> in the Pipeline and this seems to work just fine. Is there any particular
>> need or measurable impact of such approach?
>>
>>  Jan
>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>
>> Same need here, using Flink runner. We are processing a pcollection
>> (extracting features per element) then combining these into groups of
>> features and running the next operator on those groups.
>>
>> Each group contains ~50 elements, so the parallelism of the operator
>> upstream of the groupby should be higher, to be balanced with the
>> downstream operator.
>>
>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> It would be better to set parallelism for operators, as I mentioned
>>> before, there may be multiple groupby, join operators in one pipeline, and
>>> their parallelism can be different due to different input data sizes.
>>>
>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Jeff - does setting the global default work for you, or do you need
>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>
>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>> operators.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>> such kind of mechanism, so I am looking for a general solution on the beam
>>>>>>> side.
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>
>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>>>>>> operators.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Ning Kang via dev <de...@beam.apache.org>.
Hi Jan,

To generalize the per-stage parallelism configuration, we should have a FR
proposing the capability to explicitly set autoscaling (in this case, fixed
size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not part of
the Beam model. They are [Flink] runner implementation details and should
be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion looks like
until the pipeline is submitted to a runner, thus making configuration of
the parallelism/worker-per-stage not straightforward.
Flink's parallelism settings can be found here
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
it's still kind of a black box since you don't really know how many tasks
are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a pipeline
scales, each runner could adapt [auto]scaling in their own way.
For example, in a Flink job, each operator/stage's task slot is prorated by
their key numbers; the maximum parallelism is throttled by task slot
utilization.
Another example, in a Dataflow job, each stage horizontally scales by CPU
utilization; vertically scales by memory/disk utilization.

+dev@beam.apache.org <de...@beam.apache.org>
Let's use this thread to discuss how to configure a pipeline for runners so
that they can scale workers appropriately without exposing runner-specific
details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Ning,
>
> I might have missed that in the discussion, but we talk about batch
> execution, am I right? In streaming, all operators (PTransforms) of a
> Pipeline are run in the same slots, thus the downsides are limited. You can
> enforce streaming mode using --streaming command-line argument. But yes,
> this might have other implications. For batch only it obviously makes sense
> to limit parallelism of a (fused) 'stage', which is not an transform-level
> concept, but rather a more complex union of transforms divided by shuffle
> barrier. Would you be willing to start a follow-up thread in @dev mailing
> list for this for deeper discussion?
>
>  Jan
> On 4/20/23 19:18, Ning Kang via user wrote:
>
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. And
> the operator that needs the highest parallelism can only use at most
> #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who
> cannot make full use of task slots assigned to them. You might see only
> 1/10 tasks running while the other 9/10 tasks idle for an operator with
> parallelism 10, especially when it's doing some aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator
> with high fanout, we can explicitly add a Reshuffle to allow a higher
> parallelism. But this circles back to the first downside: if your pipeline
> has exponentially high fanout through it, setting a single parallelism for
> the whole pipeline is not ideal because it limits the scalability of your
> pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> this topic was discussed many years ago and the conclusion there was that
>> setting the parallelism of individual operators via FlinkPipelineOptions
>> (or ResourceHints) is be possible, but would be somewhat cumbersome.
>> Although I understand that it "feels" weird to have high parallelism for
>> operators with small inputs, does this actually bring any relevant
>> performance impact? I always use parallelism based on the largest operator
>> in the Pipeline and this seems to work just fine. Is there any particular
>> need or measurable impact of such approach?
>>
>>  Jan
>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>
>> Same need here, using Flink runner. We are processing a pcollection
>> (extracting features per element) then combining these into groups of
>> features and running the next operator on those groups.
>>
>> Each group contains ~50 elements, so the parallelism of the operator
>> upstream of the groupby should be higher, to be balanced with the
>> downstream operator.
>>
>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> It would be better to set parallelism for operators, as I mentioned
>>> before, there may be multiple groupby, join operators in one pipeline, and
>>> their parallelism can be different due to different input data sizes.
>>>
>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Jeff - does setting the global default work for you, or do you need
>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>
>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>> operators.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>> such kind of mechanism, so I am looking for a general solution on the beam
>>>>>>> side.
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>
>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>>>>>> operators.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Ning,

I might have missed that in the discussion, but we talk about batch 
execution, am I right? In streaming, all operators (PTransforms) of a 
Pipeline are run in the same slots, thus the downsides are limited. You 
can enforce streaming mode using --streaming command-line argument. But 
yes, this might have other implications. For batch only it obviously 
makes sense to limit parallelism of a (fused) 'stage', which is not an 
transform-level concept, but rather a more complex union of transforms 
divided by shuffle barrier. Would you be willing to start a follow-up 
thread in @dev mailing list for this for deeper discussion?

  Jan

On 4/20/23 19:18, Ning Kang via user wrote:
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. 
> And the operator that needs the highest parallelism can only use at 
> most #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who 
> cannot make full use of task slots assigned to them. You might see 
> only 1/10 tasks running while the other 9/10 tasks idle for an 
> operator with parallelism 10, especially when it's doing some 
> aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator 
> with high fanout, we can explicitly add a Reshuffle to allow a higher 
> parallelism. But this circles back to the first downside: if your 
> pipeline has exponentially high fanout through it, setting a single 
> parallelism for the whole pipeline is not ideal because it limits the 
> scalability of your pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>     Hi,
>
>     this topic was discussed many years ago and the conclusion there
>     was that setting the parallelism of individual operators via
>     FlinkPipelineOptions (or ResourceHints) is be possible, but would
>     be somewhat cumbersome. Although I understand that it "feels"
>     weird to have high parallelism for operators with small inputs,
>     does this actually bring any relevant performance impact? I always
>     use parallelism based on the largest operator in the Pipeline and
>     this seems to work just fine. Is there any particular need or
>     measurable impact of such approach?
>
>      Jan
>
>     On 4/19/23 17:23, Nimalan Mahendran wrote:
>>     Same need here, using Flink runner. We are processing a
>>     pcollection (extracting features per element) then combining
>>     these into groups of features and running the next operator on
>>     those groups.
>>
>>     Each group contains ~50 elements, so the parallelism of the
>>     operator upstream of the groupby should be higher, to be balanced
>>     with the downstream operator.
>>
>>     On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>>
>>         Hi Reuven,
>>
>>         It would be better to set parallelism for operators, as I
>>         mentioned before, there may be multiple groupby, join
>>         operators in one pipeline, and their parallelism can be
>>         different due to different input data sizes.
>>
>>         On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com>
>>         wrote:
>>
>>             Jeff - does setting the global default work for you, or
>>             do you need per-operator control? Seems like it would be
>>             to add this to ResourceHints.
>>
>>             On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
>>             <ro...@google.com> wrote:
>>
>>                 Yeah, I don't think we have a good per-operator API
>>                 for this. If we were to add it, it probably belongs
>>                 in ResourceHints.
>>
>>                 On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
>>                 <re...@google.com> wrote:
>>
>>                     Looking at FlinkPipelineOptions, there is a
>>                     parallelism option you can set. I believe this
>>                     sets the default parallelism for all Flink operators.
>>
>>                     On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
>>                     <zj...@gmail.com> wrote:
>>
>>                         Thanks Holden, this would work for Spark, but
>>                         Flink doesn't have such kind of mechanism, so
>>                         I am looking for a general solution on the
>>                         beam side.
>>
>>                         On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
>>                         <ho...@pigscanfly.ca> wrote:
>>
>>                             To a (small) degree Sparks “new” AQE
>>                             might be able to help depending on what
>>                             kind of operations Beam is compiling it
>>                             down to.
>>
>>                             Have you tried setting
>>                             spark.sql.adaptive.enabled &
>>                             spark.sql.adaptive.coalescePartitions.enabled
>>
>>
>>
>>                             On Mon, Apr 17, 2023 at 10:34 AM Reuven
>>                             Lax via user <us...@beam.apache.org> wrote:
>>
>>                                 I see. Robert - what is the story for
>>                                 parallelism controls on GBK with the
>>                                 Spark or Flink runners?
>>
>>                                 On Sun, Apr 16, 2023 at 6:24 PM Jeff
>>                                 Zhang <zj...@gmail.com> wrote:
>>
>>                                     No, I don't use dataflow, I use
>>                                     Spark & Flink.
>>
>>
>>                                     On Mon, Apr 17, 2023 at 8:08 AM
>>                                     Reuven Lax <re...@google.com> wrote:
>>
>>                                         Are you running on the
>>                                         Dataflow runner? If so,
>>                                         Dataflow - unlike Spark and
>>                                         Flink - dynamically modifies
>>                                         the parallelism as the
>>                                         operator runs, so there is no
>>                                         need to have such controls.
>>                                         In fact these specific
>>                                         controls wouldn't make much
>>                                         sense for the way Dataflow
>>                                         implements these operators.
>>
>>                                         On Sun, Apr 16, 2023 at
>>                                         12:25 AM Jeff Zhang
>>                                         <zj...@gmail.com> wrote:
>>
>>                                             Just for
>>                                             performance tuning like
>>                                             in Spark and Flink.
>>
>>
>>                                             On Sun, Apr 16, 2023 at
>>                                             1:10 PM Robert Bradshaw
>>                                             via user
>>                                             <us...@beam.apache.org> wrote:
>>
>>                                                 What are you trying
>>                                                 to achieve by setting
>>                                                 the parallelism?
>>
>>                                                 On Sat, Apr 15, 2023
>>                                                 at 5:13 PM Jeff Zhang
>>                                                 <zj...@gmail.com> wrote:
>>
>>                                                     Thanks Reuven,
>>                                                     what I mean is to
>>                                                     set the
>>                                                     parallelism in
>>                                                     operator level.
>>                                                     And the input
>>                                                     size of the
>>                                                     operator is
>>                                                     unknown at
>>                                                     compiling stage
>>                                                     if it is not a
>>                                                     source
>>                                                      operator,
>>
>>                                                     Here's an example
>>                                                     of flink
>>                                                     https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>                                                     Spark also
>>                                                     support to set
>>                                                     operator level
>>                                                     parallelism (see
>>                                                     groupByKey and
>>                                                     reduceByKey):
>>                                                     https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>
>>
>>                                                     On Sun, Apr 16,
>>                                                     2023 at 1:42 AM
>>                                                     Reuven Lax via
>>                                                     user
>>                                                     <us...@beam.apache.org>
>>                                                     wrote:
>>
>>                                                         The maximum
>>                                                         parallelism is
>>                                                         always
>>                                                         determined by
>>                                                         the
>>                                                         parallelism
>>                                                         of your data.
>>                                                         If you do a
>>                                                         GroupByKey
>>                                                         for example,
>>                                                         the number of
>>                                                         keys in your
>>                                                         data
>>                                                         determines
>>                                                         the maximum
>>                                                         parallelism.
>>
>>                                                         Beyond the
>>                                                         limitations
>>                                                         in your data,
>>                                                         it depends on
>>                                                         your
>>                                                         execution
>>                                                         engine. If
>>                                                         you're using
>>                                                         Dataflow,
>>                                                         Dataflow is
>>                                                         designed to
>>                                                         automatically
>>                                                         determine the
>>                                                         parallelism
>>                                                         (e.g. work
>>                                                         will be
>>                                                         dynamically
>>                                                         split and
>>                                                         moved around
>>                                                         between
>>                                                         workers, the
>>                                                         number of
>>                                                         workers will
>>                                                         autoscale,
>>                                                         etc.), so
>>                                                         there's no
>>                                                         need to
>>                                                         explicitly
>>                                                         set the
>>                                                         parallelism
>>                                                         of the execution.
>>
>>                                                         On Sat, Apr
>>                                                         15, 2023 at
>>                                                         1:12 AM Jeff
>>                                                         Zhang
>>                                                         <zj...@gmail.com>
>>                                                         wrote:
>>
>>                                                             Besides
>>                                                             the
>>                                                             global
>>                                                             parallelism of
>>                                                             beam job,
>>                                                             is there
>>                                                             any way
>>                                                             to set
>>                                                             parallelism for
>>                                                             individual
>>                                                             operators
>>                                                             like
>>                                                             group by
>>                                                             and join?
>>                                                             I
>>                                                             understand the
>>                                                             parallelism setting
>>                                                             depends
>>                                                             on the
>>                                                             underlying
>>                                                             execution
>>                                                             engine,
>>                                                             but it is
>>                                                             very
>>                                                             common to
>>                                                             set
>>                                                             parallelism like
>>                                                             group by
>>                                                             and join
>>                                                             in both
>>                                                             spark &
>>                                                             flink.
>>
>>
>>
>>
>>
>>
>>
>>                                                             -- 
>>                                                             Best Regards
>>
>>                                                             Jeff Zhang
>>
>>
>>
>>                                                     -- 
>>                                                     Best Regards
>>
>>                                                     Jeff Zhang
>>
>>
>>
>>                                             -- 
>>                                             Best Regards
>>
>>                                             Jeff Zhang
>>
>>
>>
>>                                     -- 
>>                                     Best Regards
>>
>>                                     Jeff Zhang
>>
>>                             -- 
>>                             Twitter: https://twitter.com/holdenkarau
>>                             Books (Learning Spark, High Performance
>>                             Spark, etc.): https://amzn.to/2MaRAG9
>>                             <https://amzn.to/2MaRAG9>
>>                             YouTube Live Streams:
>>                             https://www.youtube.com/user/holdenkarau
>>
>>
>>
>>                         -- 
>>                         Best Regards
>>
>>                         Jeff Zhang
>>
>>
>>
>>         -- 
>>         Best Regards
>>
>>         Jeff Zhang
>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Ning Kang via user <us...@beam.apache.org>.
Hi Jan,

The approach works when your pipeline doesn't have too many operators. And
the operator that needs the highest parallelism can only use at most
#total_task_slots / #operators resources available in the cluster.

Another downside is wasted resources for other smaller operators who cannot
make full use of task slots assigned to them. You might see only 1/10 tasks
running while the other 9/10 tasks idle for an operator with parallelism
10, especially when it's doing some aggregation like a SUM.

One redeeming method is that, for operators following another operator with
high fanout, we can explicitly add a Reshuffle to allow a higher
parallelism. But this circles back to the first downside: if your pipeline
has exponentially high fanout through it, setting a single parallelism for
the whole pipeline is not ideal because it limits the scalability of your
pipeline significantly.

Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> this topic was discussed many years ago and the conclusion there was that
> setting the parallelism of individual operators via FlinkPipelineOptions
> (or ResourceHints) is be possible, but would be somewhat cumbersome.
> Although I understand that it "feels" weird to have high parallelism for
> operators with small inputs, does this actually bring any relevant
> performance impact? I always use parallelism based on the largest operator
> in the Pipeline and this seems to work just fine. Is there any particular
> need or measurable impact of such approach?
>
>  Jan
> On 4/19/23 17:23, Nimalan Mahendran wrote:
>
> Same need here, using Flink runner. We are processing a pcollection
> (extracting features per element) then combining these into groups of
> features and running the next operator on those groups.
>
> Each group contains ~50 elements, so the parallelism of the operator
> upstream of the groupby should be higher, to be balanced with the
> downstream operator.
>
> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> It would be better to set parallelism for operators, as I mentioned
>> before, there may be multiple groupby, join operators in one pipeline, and
>> their parallelism can be different due to different input data sizes.
>>
>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Jeff - does setting the global default work for you, or do you need
>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>
>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>> were to add it, it probably belongs in ResourceHints.
>>>>
>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>>>>> set. I believe this sets the default parallelism for all Flink operators.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>>>>> kind of mechanism, so I am looking for a general solution on the beam side.
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>>>>>> on what kind of operations Beam is compiling it down to.
>>>>>>>
>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>>>>> operators.
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way
>>>>>>>>>>>>>>> to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

this topic was discussed many years ago and the conclusion there was 
that setting the parallelism of individual operators via 
FlinkPipelineOptions (or ResourceHints) is be possible, but would be 
somewhat cumbersome. Although I understand that it "feels" weird to have 
high parallelism for operators with small inputs, does this actually 
bring any relevant performance impact? I always use parallelism based on 
the largest operator in the Pipeline and this seems to work just fine. 
Is there any particular need or measurable impact of such approach?

  Jan

On 4/19/23 17:23, Nimalan Mahendran wrote:
> Same need here, using Flink runner. We are processing a pcollection 
> (extracting features per element) then combining these into groups of 
> features and running the next operator on those groups.
>
> Each group contains ~50 elements, so the parallelism of the operator 
> upstream of the groupby should be higher, to be balanced with the 
> downstream operator.
>
> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>
>     Hi Reuven,
>
>     It would be better to set parallelism for operators, as I
>     mentioned before, there may be multiple groupby, join operators in
>     one pipeline, and their parallelism can be different due to
>     different input data sizes.
>
>     On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>
>         Jeff - does setting the global default work for you, or do you
>         need per-operator control? Seems like it would be to add this
>         to ResourceHints.
>
>         On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
>         <ro...@google.com> wrote:
>
>             Yeah, I don't think we have a good per-operator API for
>             this. If we were to add it, it probably belongs in
>             ResourceHints.
>
>             On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
>             <re...@google.com> wrote:
>
>                 Looking at FlinkPipelineOptions, there is a
>                 parallelism option you can set. I believe this sets
>                 the default parallelism for all Flink operators.
>
>                 On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
>                 <zj...@gmail.com> wrote:
>
>                     Thanks Holden, this would work for Spark, but
>                     Flink doesn't have such kind of mechanism, so I am
>                     looking for a general solution on the beam side.
>
>                     On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
>                     <ho...@pigscanfly.ca> wrote:
>
>                         To a (small) degree Sparks “new” AQE might be
>                         able to help depending on what kind of
>                         operations Beam is compiling it down to.
>
>                         Have you tried setting
>                         spark.sql.adaptive.enabled &
>                         spark.sql.adaptive.coalescePartitions.enabled
>
>
>
>                         On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax
>                         via user <us...@beam.apache.org> wrote:
>
>                             I see. Robert - what is the story for
>                             parallelism controls on GBK with the Spark
>                             or Flink runners?
>
>                             On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang
>                             <zj...@gmail.com> wrote:
>
>                                 No, I don't use dataflow, I use Spark
>                                 & Flink.
>
>
>                                 On Mon, Apr 17, 2023 at 8:08 AM Reuven
>                                 Lax <re...@google.com> wrote:
>
>                                     Are you running on the Dataflow
>                                     runner? If so, Dataflow - unlike
>                                     Spark and Flink - dynamically
>                                     modifies the parallelism as the
>                                     operator runs, so there is no need
>                                     to have such controls. In fact
>                                     these specific controls wouldn't
>                                     make much sense for the way
>                                     Dataflow implements these operators.
>
>                                     On Sun, Apr 16, 2023 at 12:25 AM
>                                     Jeff Zhang <zj...@gmail.com> wrote:
>
>                                         Just for
>                                         performance tuning like in
>                                         Spark and Flink.
>
>
>                                         On Sun, Apr 16, 2023 at
>                                         1:10 PM Robert Bradshaw via
>                                         user <us...@beam.apache.org> wrote:
>
>                                             What are you trying to
>                                             achieve by setting the
>                                             parallelism?
>
>                                             On Sat, Apr 15, 2023 at
>                                             5:13 PM Jeff Zhang
>                                             <zj...@gmail.com> wrote:
>
>                                                 Thanks Reuven, what I
>                                                 mean is to set the
>                                                 parallelism in
>                                                 operator level. And
>                                                 the input size of the
>                                                 operator is unknown at
>                                                 compiling stage if it
>                                                 is not a source
>                                                  operator,
>
>                                                 Here's an example of
>                                                 flink
>                                                 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>                                                 Spark also support to
>                                                 set operator level
>                                                 parallelism (see
>                                                 groupByKey and
>                                                 reduceByKey):
>                                                 https://spark.apache.org/docs/latest/rdd-programming-guide.html
>
>
>                                                 On Sun, Apr 16, 2023
>                                                 at 1:42 AM Reuven Lax
>                                                 via user
>                                                 <us...@beam.apache.org>
>                                                 wrote:
>
>                                                     The maximum
>                                                     parallelism is
>                                                     always determined
>                                                     by the parallelism
>                                                     of your data. If
>                                                     you do a
>                                                     GroupByKey for
>                                                     example, the
>                                                     number of keys in
>                                                     your data
>                                                     determines the
>                                                     maximum parallelism.
>
>                                                     Beyond the
>                                                     limitations in
>                                                     your data, it
>                                                     depends on your
>                                                     execution engine.
>                                                     If you're using
>                                                     Dataflow, Dataflow
>                                                     is designed to
>                                                     automatically
>                                                     determine the
>                                                     parallelism (e.g.
>                                                     work will be
>                                                     dynamically split
>                                                     and moved around
>                                                     between workers,
>                                                     the number of
>                                                     workers will
>                                                     autoscale, etc.),
>                                                     so there's no need
>                                                     to explicitly set
>                                                     the parallelism of
>                                                     the execution.
>
>                                                     On Sat, Apr 15,
>                                                     2023 at 1:12 AM
>                                                     Jeff Zhang
>                                                     <zj...@gmail.com>
>                                                     wrote:
>
>                                                         Besides the
>                                                         global
>                                                         parallelism of
>                                                         beam job, is
>                                                         there any way
>                                                         to set
>                                                         parallelism for
>                                                         individual
>                                                         operators like
>                                                         group by and
>                                                         join? I
>                                                         understand the
>                                                         parallelism setting
>                                                         depends on the
>                                                         underlying
>                                                         execution
>                                                         engine, but it
>                                                         is very common
>                                                         to set
>                                                         parallelism like
>                                                         group by and
>                                                         join in both
>                                                         spark & flink.
>
>
>
>
>
>
>
>                                                         -- 
>                                                         Best Regards
>
>                                                         Jeff Zhang
>
>
>
>                                                 -- 
>                                                 Best Regards
>
>                                                 Jeff Zhang
>
>
>
>                                         -- 
>                                         Best Regards
>
>                                         Jeff Zhang
>
>
>
>                                 -- 
>                                 Best Regards
>
>                                 Jeff Zhang
>
>                         -- 
>                         Twitter: https://twitter.com/holdenkarau
>                         Books (Learning Spark, High Performance Spark,
>                         etc.): https://amzn.to/2MaRAG9
>                         <https://amzn.to/2MaRAG9>
>                         YouTube Live Streams:
>                         https://www.youtube.com/user/holdenkarau
>
>
>
>                     -- 
>                     Best Regards
>
>                     Jeff Zhang
>
>
>
>     -- 
>     Best Regards
>
>     Jeff Zhang
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Ning Kang via user <us...@beam.apache.org>.
Hi,

There is a flink pipeline option `parallelism` that you can set:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L1504-L1510
.

This parallelism is applied to each step (there is no API to configure a
different value for each step). So if you have 10 steps and set the
parallelism to 10, there will be 100 tasks created. You may use the
`max_parallelism` to limit the pipeline wide parallelism.

The reason you want to limit the max_parallelism is that a Flink cluster
might run into network issues when there are too many tasks running in
parallel. You can configure the flink cluster through configurations (an
example
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py#L56-L74>)
to allocate more resources to the task manager if applicable (say you have
the access to control the cluster's creation) to increase the capacity of
concurrent tasks. This is specific to Flink, you can find more
guidance from Flink's document:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/
.

Ning.




On Wed, Apr 19, 2023 at 8:23 AM Nimalan Mahendran <
nimalan@liminalinsights.com> wrote:

> Same need here, using Flink runner. We are processing a pcollection
> (extracting features per element) then combining these into groups of
> features and running the next operator on those groups.
>
> Each group contains ~50 elements, so the parallelism of the operator
> upstream of the groupby should be higher, to be balanced with the
> downstream operator.
>
> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> It would be better to set parallelism for operators, as I mentioned
>> before, there may be multiple groupby, join operators in one pipeline, and
>> their parallelism can be different due to different input data sizes.
>>
>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Jeff - does setting the global default work for you, or do you need
>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>
>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>> were to add it, it probably belongs in ResourceHints.
>>>>
>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>>>>> set. I believe this sets the default parallelism for all Flink operators.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>>>>> kind of mechanism, so I am looking for a general solution on the beam side.
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>>>>>> on what kind of operations Beam is compiling it down to.
>>>>>>>
>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>>>>> operators.
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way
>>>>>>>>>>>>>>> to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Nimalan Mahendran <ni...@liminalinsights.com>.
Same need here, using Flink runner. We are processing a pcollection
(extracting features per element) then combining these into groups of
features and running the next operator on those groups.

Each group contains ~50 elements, so the parallelism of the operator
upstream of the groupby should be higher, to be balanced with the
downstream operator.

On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zj...@gmail.com> wrote:

> Hi Reuven,
>
> It would be better to set parallelism for operators, as I mentioned
> before, there may be multiple groupby, join operators in one pipeline, and
> their parallelism can be different due to different input data sizes.
>
> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>
>> Jeff - does setting the global default work for you, or do you need
>> per-operator control? Seems like it would be to add this to ResourceHints.
>>
>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Yeah, I don't think we have a good per-operator API for this. If we were
>>> to add it, it probably belongs in ResourceHints.
>>>
>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>>>> set. I believe this sets the default parallelism for all Flink operators.
>>>>
>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>
>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>>>> kind of mechanism, so I am looking for a general solution on the beam side.
>>>>>
>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>>>>> on what kind of operations Beam is compiling it down to.
>>>>>>
>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>>>> operators.
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>> operator level. And the input size of the operator is unknown at compiling
>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>  operator,
>>>>>>>>>>>>
>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the number of
>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way
>>>>>>>>>>>>>> to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jeff Zhang <zj...@gmail.com>.
Hi Reuven,

It would be better to set parallelism for operators, as I mentioned before,
there may be multiple groupby, join operators in one pipeline, and their
parallelism can be different due to different input data sizes.

On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:

> Jeff - does setting the global default work for you, or do you need
> per-operator control? Seems like it would be to add this to ResourceHints.
>
> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Yeah, I don't think we have a good per-operator API for this. If we were
>> to add it, it probably belongs in ResourceHints.
>>
>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>>> set. I believe this sets the default parallelism for all Flink operators.
>>>
>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>>> kind of mechanism, so I am looking for a general solution on the beam side.
>>>>
>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>>>> on what kind of operations Beam is compiling it down to.
>>>>>
>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>> GBK with the Spark or Flink runners?
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>
>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>>> operators.
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>
>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>>>>>>>> it is not a source
>>>>>>>>>>>  operator,
>>>>>>>>>>>
>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The maximum parallelism is always determined by the parallelism
>>>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of keys in
>>>>>>>>>>>> your data determines the maximum parallelism.
>>>>>>>>>>>>
>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>>> the execution.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way
>>>>>>>>>>>>> to set parallelism for individual operators like group by and join? I
>>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>

-- 
Best Regards

Jeff Zhang

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Reuven Lax via user <us...@beam.apache.org>.
Jeff - does setting the global default work for you, or do you need
per-operator control? Seems like it would be to add this to ResourceHints.

On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <ro...@google.com>
wrote:

> Yeah, I don't think we have a good per-operator API for this. If we were
> to add it, it probably belongs in ResourceHints.
>
> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>
>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>> set. I believe this sets the default parallelism for all Flink operators.
>>
>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>> kind of mechanism, so I am looking for a general solution on the beam side.
>>>
>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> To a (small) degree Sparks “new” AQE might be able to help depending on
>>>> what kind of operations Beam is compiling it down to.
>>>>
>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>
>>>>
>>>>
>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> I see. Robert - what is the story for parallelism controls on GBK with
>>>>> the Spark or Flink runners?
>>>>>
>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>
>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>>> operators.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>>>>>>> it is not a source
>>>>>>>>>>  operator,
>>>>>>>>>>
>>>>>>>>>> Here's an example of flink
>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>>> and reduceByKey):
>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> The maximum parallelism is always determined by the parallelism
>>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of keys in
>>>>>>>>>>> your data determines the maximum parallelism.
>>>>>>>>>>>
>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to
>>>>>>>>>>> automatically determine the parallelism (e.g. work will be dynamically
>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the parallelism of
>>>>>>>>>>> the execution.
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
Yeah, I don't think we have a good per-operator API for this. If we were to
add it, it probably belongs in ResourceHints.

On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:

> Looking at FlinkPipelineOptions, there is a parallelism option you can
> set. I believe this sets the default parallelism for all Flink operators.
>
> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:
>
>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>> kind of mechanism, so I am looking for a general solution on the beam side.
>>
>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> To a (small) degree Sparks “new” AQE might be able to help depending on
>>> what kind of operations Beam is compiling it down to.
>>>
>>> Have you tried setting spark.sql.adaptive.enabled &
>>> spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> I see. Robert - what is the story for parallelism controls on GBK with
>>>> the Spark or Flink runners?
>>>>
>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>
>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>
>>>>>
>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>> operators.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>
>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>
>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>>>>>> it is not a source
>>>>>>>>>  operator,
>>>>>>>>>
>>>>>>>>> Here's an example of flink
>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>> and reduceByKey):
>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> The maximum parallelism is always determined by the parallelism
>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of keys in
>>>>>>>>>> your data determines the maximum parallelism.
>>>>>>>>>>
>>>>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>>>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>>>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>>>>
>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Reuven Lax via user <us...@beam.apache.org>.
Looking at FlinkPipelineOptions, there is a parallelism option you can set.
I believe this sets the default parallelism for all Flink operators.

On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zj...@gmail.com> wrote:

> Thanks Holden, this would work for Spark, but Flink doesn't have such kind
> of mechanism, so I am looking for a general solution on the beam side.
>
> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>> To a (small) degree Sparks “new” AQE might be able to help depending on
>> what kind of operations Beam is compiling it down to.
>>
>> Have you tried setting spark.sql.adaptive.enabled &
>> spark.sql.adaptive.coalescePartitions.enabled
>>
>>
>>
>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>> I see. Robert - what is the story for parallelism controls on GBK with
>>> the Spark or Flink runners?
>>>
>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>
>>>>
>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>>>>> and Flink - dynamically modifies the parallelism as the operator runs, so
>>>>> there is no need to have such controls. In fact these specific controls
>>>>> wouldn't make much sense for the way Dataflow implements these operators.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>
>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>
>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>>>>> it is not a source
>>>>>>>>  operator,
>>>>>>>>
>>>>>>>> Here's an example of flink
>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>> and reduceByKey):
>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>>>>> your data. If you do a GroupByKey for example, the number of keys in your
>>>>>>>>> data determines the maximum parallelism.
>>>>>>>>>
>>>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>>> both spark & flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jeff Zhang <zj...@gmail.com>.
Thanks Holden, this would work for Spark, but Flink doesn't have such kind
of mechanism, so I am looking for a general solution on the beam side.

On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <ho...@pigscanfly.ca> wrote:

> To a (small) degree Sparks “new” AQE might be able to help depending on
> what kind of operations Beam is compiling it down to.
>
> Have you tried setting spark.sql.adaptive.enabled &
> spark.sql.adaptive.coalescePartitions.enabled
>
>
>
> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <us...@beam.apache.org>
> wrote:
>
>> I see. Robert - what is the story for parallelism controls on GBK with
>> the Spark or Flink runners?
>>
>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> No, I don't use dataflow, I use Spark & Flink.
>>>
>>>
>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>>>> and Flink - dynamically modifies the parallelism as the operator runs, so
>>>> there is no need to have such controls. In fact these specific controls
>>>> wouldn't make much sense for the way Dataflow implements these operators.
>>>>
>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>>
>>>>> Just for performance tuning like in Spark and Flink.
>>>>>
>>>>>
>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>
>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>>>> it is not a source
>>>>>>>  operator,
>>>>>>>
>>>>>>> Here's an example of flink
>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>> and reduceByKey):
>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>>>> your data. If you do a GroupByKey for example, the number of keys in your
>>>>>>>> data determines the maximum parallelism.
>>>>>>>>
>>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>>
>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>>> both spark & flink.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


-- 
Best Regards

Jeff Zhang

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Holden Karau <ho...@pigscanfly.ca>.
To a (small) degree Sparks “new” AQE might be able to help depending on
what kind of operations Beam is compiling it down to.

Have you tried setting spark.sql.adaptive.enabled &
spark.sql.adaptive.coalescePartitions.enabled



On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <us...@beam.apache.org>
wrote:

> I see. Robert - what is the story for parallelism controls on GBK with the
> Spark or Flink runners?
>
> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:
>
>> No, I don't use dataflow, I use Spark & Flink.
>>
>>
>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>>> and Flink - dynamically modifies the parallelism as the operator runs, so
>>> there is no need to have such controls. In fact these specific controls
>>> wouldn't make much sense for the way Dataflow implements these operators.
>>>
>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>> Just for performance tuning like in Spark and Flink.
>>>>
>>>>
>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> What are you trying to achieve by setting the parallelism?
>>>>>
>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>>> it is not a source
>>>>>>  operator,
>>>>>>
>>>>>> Here's an example of flink
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>> and reduceByKey):
>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>>> your data. If you do a GroupByKey for example, the number of keys in your
>>>>>>> data determines the maximum parallelism.
>>>>>>>
>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>
>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>>>>> parallelism for individual operators like group by and join? I
>>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>>> both spark & flink.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
> --
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.):
https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
YouTube Live Streams: https://www.youtube.com/user/holdenkarau

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Reuven Lax via user <us...@beam.apache.org>.
I see. Robert - what is the story for parallelism controls on GBK with the
Spark or Flink runners?

On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zj...@gmail.com> wrote:

> No, I don't use dataflow, I use Spark & Flink.
>
>
> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:
>
>> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark
>> and Flink - dynamically modifies the parallelism as the operator runs, so
>> there is no need to have such controls. In fact these specific controls
>> wouldn't make much sense for the way Dataflow implements these operators.
>>
>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> Just for performance tuning like in Spark and Flink.
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> What are you trying to achieve by setting the parallelism?
>>>>
>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>
>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>> level. And the input size of the operator is unknown at compiling stage if
>>>>> it is not a source
>>>>>  operator,
>>>>>
>>>>> Here's an example of flink
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>> and reduceByKey):
>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>
>>>>>
>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> The maximum parallelism is always determined by the parallelism of
>>>>>> your data. If you do a GroupByKey for example, the number of keys in your
>>>>>> data determines the maximum parallelism.
>>>>>>
>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>
>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>>
>>>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>>>> parallelism for individual operators like group by and join? I
>>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>>> both spark & flink.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jeff Zhang <zj...@gmail.com>.
No, I don't use dataflow, I use Spark & Flink.


On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote:

> Are you running on the Dataflow runner? If so, Dataflow - unlike Spark and
> Flink - dynamically modifies the parallelism as the operator runs, so there
> is no need to have such controls. In fact these specific controls wouldn't
> make much sense for the way Dataflow implements these operators.
>
> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com> wrote:
>
>> Just for performance tuning like in Spark and Flink.
>>
>>
>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>>
>>> What are you trying to achieve by setting the parallelism?
>>>
>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>> Thanks Reuven, what I mean is to set the parallelism in operator level.
>>>> And the input size of the operator is unknown at compiling stage if it is
>>>> not a source
>>>>  operator,
>>>>
>>>> Here's an example of flink
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>> Spark also support to set operator level parallelism (see groupByKey
>>>> and reduceByKey):
>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>
>>>>
>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> The maximum parallelism is always determined by the parallelism of
>>>>> your data. If you do a GroupByKey for example, the number of keys in your
>>>>> data determines the maximum parallelism.
>>>>>
>>>>> Beyond the limitations in your data, it depends on your execution
>>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>>> around between workers, the number of workers will autoscale, etc.), so
>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>
>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>
>>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>>> parallelism for individual operators like group by and join? I
>>>>>> understand the parallelism setting depends on the underlying execution
>>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>>> both spark & flink.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Reuven Lax via user <us...@beam.apache.org>.
Are you running on the Dataflow runner? If so, Dataflow - unlike Spark and
Flink - dynamically modifies the parallelism as the operator runs, so there
is no need to have such controls. In fact these specific controls wouldn't
make much sense for the way Dataflow implements these operators.

On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zj...@gmail.com> wrote:

> Just for performance tuning like in Spark and Flink.
>
>
> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> What are you trying to achieve by setting the parallelism?
>>
>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> Thanks Reuven, what I mean is to set the parallelism in operator level.
>>> And the input size of the operator is unknown at compiling stage if it is
>>> not a source
>>>  operator,
>>>
>>> Here's an example of flink
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>> Spark also support to set operator level parallelism (see groupByKey
>>> and reduceByKey):
>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> The maximum parallelism is always determined by the parallelism of your
>>>> data. If you do a GroupByKey for example, the number of keys in your data
>>>> determines the maximum parallelism.
>>>>
>>>> Beyond the limitations in your data, it depends on your execution
>>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>>> determine the parallelism (e.g. work will be dynamically split and moved
>>>> around between workers, the number of workers will autoscale, etc.), so
>>>> there's no need to explicitly set the parallelism of the execution.
>>>>
>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>>
>>>>> Besides the global parallelism of beam job, is there any way to set
>>>>> parallelism for individual operators like group by and join? I
>>>>> understand the parallelism setting depends on the underlying execution
>>>>> engine, but it is very common to set parallelism like group by and join in
>>>>> both spark & flink.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jeff Zhang <zj...@gmail.com>.
Just for performance tuning like in Spark and Flink.


On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> What are you trying to achieve by setting the parallelism?
>
> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:
>
>> Thanks Reuven, what I mean is to set the parallelism in operator level.
>> And the input size of the operator is unknown at compiling stage if it is
>> not a source
>>  operator,
>>
>> Here's an example of flink
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>> Spark also support to set operator level parallelism (see groupByKey and
>> reduceByKey):
>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>
>>
>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <us...@beam.apache.org>
>> wrote:
>>
>>> The maximum parallelism is always determined by the parallelism of your
>>> data. If you do a GroupByKey for example, the number of keys in your data
>>> determines the maximum parallelism.
>>>
>>> Beyond the limitations in your data, it depends on your execution
>>> engine. If you're using Dataflow, Dataflow is designed to automatically
>>> determine the parallelism (e.g. work will be dynamically split and moved
>>> around between workers, the number of workers will autoscale, etc.), so
>>> there's no need to explicitly set the parallelism of the execution.
>>>
>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>> Besides the global parallelism of beam job, is there any way to set
>>>> parallelism for individual operators like group by and join? I
>>>> understand the parallelism setting depends on the underlying execution
>>>> engine, but it is very common to set parallelism like group by and join in
>>>> both spark & flink.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
What are you trying to achieve by setting the parallelism?

On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zj...@gmail.com> wrote:

> Thanks Reuven, what I mean is to set the parallelism in operator level.
> And the input size of the operator is unknown at compiling stage if it is
> not a source
>  operator,
>
> Here's an example of flink
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
> Spark also support to set operator level parallelism (see groupByKey and
> reduceByKey):
> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>
>
> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <us...@beam.apache.org>
> wrote:
>
>> The maximum parallelism is always determined by the parallelism of your
>> data. If you do a GroupByKey for example, the number of keys in your data
>> determines the maximum parallelism.
>>
>> Beyond the limitations in your data, it depends on your execution engine.
>> If you're using Dataflow, Dataflow is designed to automatically determine
>> the parallelism (e.g. work will be dynamically split and moved around
>> between workers, the number of workers will autoscale, etc.), so there's no
>> need to explicitly set the parallelism of the execution.
>>
>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:
>>
>>> Besides the global parallelism of beam job, is there any way to set
>>> parallelism for individual operators like group by and join? I
>>> understand the parallelism setting depends on the underlying execution
>>> engine, but it is very common to set parallelism like group by and join in
>>> both spark & flink.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Jeff Zhang <zj...@gmail.com>.
Thanks Reuven, what I mean is to set the parallelism in operator level. And
the input size of the operator is unknown at compiling stage if it is not a
source
 operator,

Here's an example of flink
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
Spark also support to set operator level parallelism (see groupByKey and
reduceByKey):
https://spark.apache.org/docs/latest/rdd-programming-guide.html


On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <us...@beam.apache.org>
wrote:

> The maximum parallelism is always determined by the parallelism of your
> data. If you do a GroupByKey for example, the number of keys in your data
> determines the maximum parallelism.
>
> Beyond the limitations in your data, it depends on your execution engine.
> If you're using Dataflow, Dataflow is designed to automatically determine
> the parallelism (e.g. work will be dynamically split and moved around
> between workers, the number of workers will autoscale, etc.), so there's no
> need to explicitly set the parallelism of the execution.
>
> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:
>
>> Besides the global parallelism of beam job, is there any way to set
>> parallelism for individual operators like group by and join? I
>> understand the parallelism setting depends on the underlying execution
>> engine, but it is very common to set parallelism like group by and join in
>> both spark & flink.
>>
>>
>>
>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang

Re: Is there any way to set the parallelism of operators like group by, join?

Posted by Reuven Lax via user <us...@beam.apache.org>.
The maximum parallelism is always determined by the parallelism of your
data. If you do a GroupByKey for example, the number of keys in your data
determines the maximum parallelism.

Beyond the limitations in your data, it depends on your execution engine.
If you're using Dataflow, Dataflow is designed to automatically determine
the parallelism (e.g. work will be dynamically split and moved around
between workers, the number of workers will autoscale, etc.), so there's no
need to explicitly set the parallelism of the execution.

On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zj...@gmail.com> wrote:

> Besides the global parallelism of beam job, is there any way to set
> parallelism for individual operators like group by and join? I
> understand the parallelism setting depends on the underlying execution
> engine, but it is very common to set parallelism like group by and join in
> both spark & flink.
>
>
>
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>