You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Isidoros Ioannou <ak...@gmail.com> on 2022/04/01 09:37:09 UTC

Unbalanced distribution of keyed stream to downstream parallel operators

Hello,

we ran a flink application version 1.13.2 that consists of a kafka source
with one partition so far
then we filter the data based on some conditions, mapped to POJOS and we
transform to a KeyedStream based on an accountId long property from the
POJO. The downstream operators are 10 CEP operators that run with
parallelism of 14 and the maxParallelism is set to the (operatorParallelism
* operatorParallelism).
As you see in the image attached the events are distributed unevenly so
some subtasks are busy and others are idle.
Is there any way to distribute evenly the load to the subtasks? Thank you
in advance.
[image: Capture.PNG]

Re: Unbalanced distribution of keyed stream to downstream parallel operators

Posted by Isidoros Ioannou <ak...@gmail.com>.
Hello Arvid ,
thank you for your reply.

Actually using a window to aggregate the events for a time period is not
applicable to my case since I need the records to be processed immediately.
Even if I could I still can not understand how I could forward
the aggregated events to lets say 2 parallel operators. The slot assignment
of the KeyGroup is done by flink. You mean key by again by a different
property so that the previous aggregate events get reassigned again to
operators. I apologize if my question is naive but I got a little confused.




Στις Δευ 4 Απρ 2022 στις 10:38 π.μ., ο/η Arvid Heise <ar...@apache.org>
έγραψε:

> You should create a histogram over the keys of the records. If you see a
> skew, one way to go about it is to refine the key or split aggregations.
>
> For example, consider you want to count events per users and 2 users are
> actually bots spamming lots of events accounting for 50% of all events.
> Then, you will always collect all events of each bot on one machine which
> limits scalability. You can, however, first aggregate all events per user
> per day (or any other way to subdivide). Then, the same bot can be
> processed in parallel and you then do an overall aggregation.
>
> If that's not possible, then your problem itself limits the scalability
> and you can only try to not get both bot users on the same machine (which
> can happen in 2). Then you can simply try to shift the key by adding
> constants to it and check if the distribution looks better. Have a look at
> KeyGroupRangeAssignment [1] to test that out without running Flink itself.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>
> On Mon, Apr 4, 2022 at 9:25 AM Isidoros Ioannou <ak...@gmail.com>
> wrote:
>
>> Hello Qingsheng,
>>
>> thank you a lot for your answer.
>>
>> I will try to modify the key as you mentioned in your first assumption.
>> In case the second assumption is valid also, what would you propose to
>> remedy the situation? Try to experiment with different values of max
>> parallelism?
>>
>>
>> Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren <re...@gmail.com>
>> έγραψε:
>>
>>> Hi Isidoros,
>>>
>>> Two assumptions in my mind:
>>>
>>> 1. Records are not evenly distributed across different keys, e.g. some
>>> accountId just has more events than others. If the record distribution is
>>> predicable, you can try to combine other fields or include more information
>>> into the key field to help balancing the distribution.
>>>
>>> 2. Keys themselves are not distributed evenly. In short the subtask ID
>>> that a key belongs to is calculated by murmurHash(key.hashCode()) %
>>> maxParallelism, so if the distribution of keys is quite strange, it’s
>>> possible that most keys drop into the same subtask with the algorithm
>>> above. AFAIK there isn't such kind of metric for monitoring number of keys
>>> in a subtask, but I think you can simply investigate it with a map function
>>> after keyBy.
>>>
>>> Hope this would be helpful!
>>>
>>> Qingsheng
>>>
>>> > On Apr 1, 2022, at 17:37, Isidoros Ioannou <ak...@gmail.com> wrote:
>>> >
>>> > Hello,
>>> >
>>> > we ran a flink application version 1.13.2 that consists of a kafka
>>> source with one partition so far
>>> > then we filter the data based on some conditions, mapped to POJOS and
>>> we transform to a KeyedStream based on an accountId long property from the
>>> POJO. The downstream operators are 10 CEP operators that run with
>>> parallelism of 14 and the maxParallelism is set to the (operatorParallelism
>>> * operatorParallelism).
>>> > As you see in the image attached the events are distributed unevenly
>>> so some subtasks are busy and others are idle.
>>> > Is there any way to distribute evenly the load to the subtasks? Thank
>>> you in advance.
>>> > <Capture.PNG>
>>> >
>>>
>>>

Re: Unbalanced distribution of keyed stream to downstream parallel operators

Posted by Arvid Heise <ar...@apache.org>.
You should create a histogram over the keys of the records. If you see a
skew, one way to go about it is to refine the key or split aggregations.

For example, consider you want to count events per users and 2 users are
actually bots spamming lots of events accounting for 50% of all events.
Then, you will always collect all events of each bot on one machine which
limits scalability. You can, however, first aggregate all events per user
per day (or any other way to subdivide). Then, the same bot can be
processed in parallel and you then do an overall aggregation.

If that's not possible, then your problem itself limits the scalability and
you can only try to not get both bot users on the same machine (which can
happen in 2). Then you can simply try to shift the key by adding constants
to it and check if the distribution looks better. Have a look at
KeyGroupRangeAssignment [1] to test that out without running Flink itself.

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java

On Mon, Apr 4, 2022 at 9:25 AM Isidoros Ioannou <ak...@gmail.com> wrote:

> Hello Qingsheng,
>
> thank you a lot for your answer.
>
> I will try to modify the key as you mentioned in your first assumption. In
> case the second assumption is valid also, what would you propose to remedy
> the situation? Try to experiment with different values of max parallelism?
>
>
> Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren <re...@gmail.com>
> έγραψε:
>
>> Hi Isidoros,
>>
>> Two assumptions in my mind:
>>
>> 1. Records are not evenly distributed across different keys, e.g. some
>> accountId just has more events than others. If the record distribution is
>> predicable, you can try to combine other fields or include more information
>> into the key field to help balancing the distribution.
>>
>> 2. Keys themselves are not distributed evenly. In short the subtask ID
>> that a key belongs to is calculated by murmurHash(key.hashCode()) %
>> maxParallelism, so if the distribution of keys is quite strange, it’s
>> possible that most keys drop into the same subtask with the algorithm
>> above. AFAIK there isn't such kind of metric for monitoring number of keys
>> in a subtask, but I think you can simply investigate it with a map function
>> after keyBy.
>>
>> Hope this would be helpful!
>>
>> Qingsheng
>>
>> > On Apr 1, 2022, at 17:37, Isidoros Ioannou <ak...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > we ran a flink application version 1.13.2 that consists of a kafka
>> source with one partition so far
>> > then we filter the data based on some conditions, mapped to POJOS and
>> we transform to a KeyedStream based on an accountId long property from the
>> POJO. The downstream operators are 10 CEP operators that run with
>> parallelism of 14 and the maxParallelism is set to the (operatorParallelism
>> * operatorParallelism).
>> > As you see in the image attached the events are distributed unevenly so
>> some subtasks are busy and others are idle.
>> > Is there any way to distribute evenly the load to the subtasks? Thank
>> you in advance.
>> > <Capture.PNG>
>> >
>>
>>

Re: Unbalanced distribution of keyed stream to downstream parallel operators

Posted by Isidoros Ioannou <ak...@gmail.com>.
Hello Qingsheng,

thank you a lot for your answer.

I will try to modify the key as you mentioned in your first assumption. In
case the second assumption is valid also, what would you propose to remedy
the situation? Try to experiment with different values of max parallelism?


Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren <re...@gmail.com>
έγραψε:

> Hi Isidoros,
>
> Two assumptions in my mind:
>
> 1. Records are not evenly distributed across different keys, e.g. some
> accountId just has more events than others. If the record distribution is
> predicable, you can try to combine other fields or include more information
> into the key field to help balancing the distribution.
>
> 2. Keys themselves are not distributed evenly. In short the subtask ID
> that a key belongs to is calculated by murmurHash(key.hashCode()) %
> maxParallelism, so if the distribution of keys is quite strange, it’s
> possible that most keys drop into the same subtask with the algorithm
> above. AFAIK there isn't such kind of metric for monitoring number of keys
> in a subtask, but I think you can simply investigate it with a map function
> after keyBy.
>
> Hope this would be helpful!
>
> Qingsheng
>
> > On Apr 1, 2022, at 17:37, Isidoros Ioannou <ak...@gmail.com> wrote:
> >
> > Hello,
> >
> > we ran a flink application version 1.13.2 that consists of a kafka
> source with one partition so far
> > then we filter the data based on some conditions, mapped to POJOS and we
> transform to a KeyedStream based on an accountId long property from the
> POJO. The downstream operators are 10 CEP operators that run with
> parallelism of 14 and the maxParallelism is set to the (operatorParallelism
> * operatorParallelism).
> > As you see in the image attached the events are distributed unevenly so
> some subtasks are busy and others are idle.
> > Is there any way to distribute evenly the load to the subtasks? Thank
> you in advance.
> > <Capture.PNG>
> >
>
>

Re: Unbalanced distribution of keyed stream to downstream parallel operators

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Isidoros,

Two assumptions in my mind: 

1. Records are not evenly distributed across different keys, e.g. some accountId just has more events than others. If the record distribution is predicable, you can try to combine other fields or include more information into the key field to help balancing the distribution. 

2. Keys themselves are not distributed evenly. In short the subtask ID that a key belongs to is calculated by murmurHash(key.hashCode()) % maxParallelism, so if the distribution of keys is quite strange, it’s possible that most keys drop into the same subtask with the algorithm above. AFAIK there isn't such kind of metric for monitoring number of keys in a subtask, but I think you can simply investigate it with a map function after keyBy. 

Hope this would be helpful!

Qingsheng

> On Apr 1, 2022, at 17:37, Isidoros Ioannou <ak...@gmail.com> wrote:
> 
> Hello,
> 
> we ran a flink application version 1.13.2 that consists of a kafka source with one partition so far
> then we filter the data based on some conditions, mapped to POJOS and we transform to a KeyedStream based on an accountId long property from the POJO. The downstream operators are 10 CEP operators that run with parallelism of 14 and the maxParallelism is set to the (operatorParallelism * operatorParallelism).
> As you see in the image attached the events are distributed unevenly so some subtasks are busy and others are idle.
> Is there any way to distribute evenly the load to the subtasks? Thank you in advance.
> <Capture.PNG>
>