You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Surana <vi...@moengage.com> on 2022/06/29 12:55:41 UTC

Optimizing parallelism in reactive mode with adaptive scaling

I have a job which has about 10 operators, 3 of which are heavy weight. I
understand that the current implementation of autoscaling gives more or
less no configurability besides max parallelism. That is practically
useless as the operators I have will inevitably choke if one of the 3 ends
up with insufficient slots. I have explored the following:

   1. Set very high max parallelism for the most heavy weight operator with
   the hope that flink can use this signal to allocate subtasks. But this
   doesn't work
   2. I used slot sharing to group 2 of the 3 operators and created a slot
   sharing group for just the other one with the hope that it will free up
   more slots. Both of these are stateful operators with RocksDB being the
   state backend. However despite setting the same slot sharing group name,
   they're scheduled independently and each of the three (successive)
   operators end up with the exact same parallelism no matter how many task
   managers are running. I say slot sharing doesn't work because if it did,
   there would have been more available slots. It is curious that flink ends
   up allocating an identical number of slots to each.
   3. When slot sharing is enabled, my other jobs are able to work with
   very few slots. In this job, I see the opposite. For instance, if I spin up
   20 task managers each with 16 slots, then there are 320 available slots.
   However once the job starts, the job itself says ~275 slots are used and
   the number of available slots in the GUI is 0. I have verified that 275 is
   the correct number by examining the number of subtasks of each operator.
   How can that be? Where are the remaining slots?
   4. While the data is partitioned by a hash function that ought to more
   or less distribute data randomly across operators, I can see that some
   operators are overloaded while others aren't. Does flink try to avoid
   uniformly distributing load for any reason, possibly to reduce network? Is
   there a way to disable such a feature?

I'm running flink version 1.13.5 but I didn't see any related change in
recent versions of flink.

Thanks a lot!

--
Vishal

Re: Optimizing parallelism in reactive mode with adaptive scaling

Posted by Vishal Surana <vi...@moengage.com>.
I've attached a screenshot of the job which highlights the "missing slots".
[image: Screenshot 2022-06-29 at 9.38.54 PM.png]

Coming to slot sharing, it seems that slot sharing isn't being honored. It
doesn't matter if I put 2 or 3 of the 3 heavy weight operators - flink is
simply ignoring it and assigning them identical number of slots while not
putting them in the slot group. I say this because the number above in the
picture (145 slots) is arrived at by counting all the slots.

You can tell me what I'm doing wrong. I've asked for many more slots (~350)
but job manager isn't able to as presumably there aren't any available
slots.

Thanks for your reply!

-Vishal

On Wed, Jun 29, 2022 at 7:30 PM Weihua Hu <hu...@gmail.com> wrote:

> Hi, Vishal
>
> The reactive mode will adjust the parallelism of tasks by slots of
> cluster. it will not allocate new workers automatically.[1]
> 1. max parallelism only works to scale up the parallelism of tasks.  it
> will not affect the scheduling of tasks.
> 2. flink will enable slot sharing by default, use two slot sharing groups
> to split tasks will cause more slots in summary.
> 3. Are you sure there are 20 task managers running? Could you give a pic
> of your job UI?
> 4. That depends on whether you have some hot-key in data, if the hot-key
> data is dealt by the same operator, it will cause the overload.
>
> Maybe just giving this job more task managers and slots will solve the
> heavy operators.
>
>
> [1]https://flink.apache.org/2021/05/06/reactive-mode.html
>
> Best,
> Weihua
>
>
> On Wed, Jun 29, 2022 at 8:56 PM Vishal Surana <vi...@moengage.com> wrote:
>
>> I have a job which has about 10 operators, 3 of which are heavy weight. I
>> understand that the current implementation of autoscaling gives more or
>> less no configurability besides max parallelism. That is practically
>> useless as the operators I have will inevitably choke if one of the 3 ends
>> up with insufficient slots. I have explored the following:
>>
>>    1. Set very high max parallelism for the most heavy weight operator
>>    with the hope that flink can use this signal to allocate subtasks. But this
>>    doesn't work
>>    2. I used slot sharing to group 2 of the 3 operators and created a
>>    slot sharing group for just the other one with the hope that it will
>>    free up more slots. Both of these are stateful operators with RocksDB being
>>    the state backend. However despite setting the same slot sharing group
>>    name, they're scheduled independently and each of the three (successive)
>>    operators end up with the exact same parallelism no matter how many task
>>    managers are running. I say slot sharing doesn't work because if it did,
>>    there would have been more available slots. It is curious that flink ends
>>    up allocating an identical number of slots to each.
>>    3. When slot sharing is enabled, my other jobs are able to work with
>>    very few slots. In this job, I see the opposite. For instance, if I spin up
>>    20 task managers each with 16 slots, then there are 320 available slots.
>>    However once the job starts, the job itself says ~275 slots are used and
>>    the number of available slots in the GUI is 0. I have verified that 275 is
>>    the correct number by examining the number of subtasks of each operator.
>>    How can that be? Where are the remaining slots?
>>    4. While the data is partitioned by a hash function that ought to
>>    more or less distribute data randomly across operators, I can see that some
>>    operators are overloaded while others aren't. Does flink try to avoid
>>    uniformly distributing load for any reason, possibly to reduce network? Is
>>    there a way to disable such a feature?
>>
>> I'm running flink version 1.13.5 but I didn't see any related change in
>> recent versions of flink.
>>
>> Thanks a lot!
>>
>> --
>> Vishal
>>
>

-- 
Regards,
Vishal

Re: Optimizing parallelism in reactive mode with adaptive scaling

Posted by Weihua Hu <hu...@gmail.com>.
Hi, Vishal

The reactive mode will adjust the parallelism of tasks by slots of cluster.
it will not allocate new workers automatically.[1]
1. max parallelism only works to scale up the parallelism of tasks.  it
will not affect the scheduling of tasks.
2. flink will enable slot sharing by default, use two slot sharing groups
to split tasks will cause more slots in summary.
3. Are you sure there are 20 task managers running? Could you give a pic of
your job UI?
4. That depends on whether you have some hot-key in data, if the hot-key
data is dealt by the same operator, it will cause the overload.

Maybe just giving this job more task managers and slots will solve the
heavy operators.


[1]https://flink.apache.org/2021/05/06/reactive-mode.html

Best,
Weihua


On Wed, Jun 29, 2022 at 8:56 PM Vishal Surana <vi...@moengage.com> wrote:

> I have a job which has about 10 operators, 3 of which are heavy weight. I
> understand that the current implementation of autoscaling gives more or
> less no configurability besides max parallelism. That is practically
> useless as the operators I have will inevitably choke if one of the 3 ends
> up with insufficient slots. I have explored the following:
>
>    1. Set very high max parallelism for the most heavy weight operator
>    with the hope that flink can use this signal to allocate subtasks. But this
>    doesn't work
>    2. I used slot sharing to group 2 of the 3 operators and created a
>    slot sharing group for just the other one with the hope that it will
>    free up more slots. Both of these are stateful operators with RocksDB being
>    the state backend. However despite setting the same slot sharing group
>    name, they're scheduled independently and each of the three (successive)
>    operators end up with the exact same parallelism no matter how many task
>    managers are running. I say slot sharing doesn't work because if it did,
>    there would have been more available slots. It is curious that flink ends
>    up allocating an identical number of slots to each.
>    3. When slot sharing is enabled, my other jobs are able to work with
>    very few slots. In this job, I see the opposite. For instance, if I spin up
>    20 task managers each with 16 slots, then there are 320 available slots.
>    However once the job starts, the job itself says ~275 slots are used and
>    the number of available slots in the GUI is 0. I have verified that 275 is
>    the correct number by examining the number of subtasks of each operator.
>    How can that be? Where are the remaining slots?
>    4. While the data is partitioned by a hash function that ought to more
>    or less distribute data randomly across operators, I can see that some
>    operators are overloaded while others aren't. Does flink try to avoid
>    uniformly distributing load for any reason, possibly to reduce network? Is
>    there a way to disable such a feature?
>
> I'm running flink version 1.13.5 but I didn't see any related change in
> recent versions of flink.
>
> Thanks a lot!
>
> --
> Vishal
>