You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2019/10/22 08:00:09 UTC

Monitor number of keys per Taskmanager

Hi to all,
I was looking into the Flink example of the Flink training trying to
understand why in the ClickEventCount[1]  one task manager was reading
twice the speed of the other.

I had to debug a lot of internal code of Flink to understand that it
depends on the adopted hash function (used by Flink to assign keys to
taskmanagers) that was assigning 4 keys to a TM and 2 to the other. Is
there a smarter way to monitor this thing (e.g. a metric like
taskManager_numKeys)?

I also discovered that one cannot force how to partition keys per
taskmanager (i.e. use keyBy after a customPartition). Is there any
development effort in this direction?

Best,
Flavio

[1]
https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java

Re: Monitor number of keys per Taskmanager

Posted by Flavio Pompermaier <po...@okkam.it>.
Thnk you all for the reply. Maybe I could set up some metrics and count the
keys per subtasks/slot by myself.
However in the example of the playground there are 6 keys and they get
distributed in the 2 slots as 4 and 2: is this a bug (since Piotr said that key
groups can have sizes +/- 1 and in this case is 2)?

Il Mer 23 Ott 2019, 18:54 Till Rohrmann <tr...@apache.org> ha scritto:

> Currently, we don't work on trying to ensure that the number of key groups
> is as evenly spread as possible. As a workaround I would suggest to
> increase the number of key groups or to change the key function.
>
> Cheers,
> Till
>
> On Wed, Oct 23, 2019 at 1:42 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
>> Hi,
>>
>> This is a known issue of Flink. For example key groups can have sizes +/-
>> 1 and they are currently randomly distributed across the cluster, so some
>> machines will get more keys to handle then the others. If the number of
>> keys is relatively small, like 3 keys per key group, the load difference
>> can be quite large (some machines may get almost only key groups with size
>> 2 while others will get mostly with size of 3, making 50% load difference).
>>
>> Unfortunately I don’t know about any concrete plans to address it. Maybe
>> Till will know something more (I CC’ed him).
>>
>> Also I don’t think it’s exposed via a metric anywhere.
>>
>> Piotrek
>>
>> On 22 Oct 2019, at 10:00, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>> Hi to all,
>> I was looking into the Flink example of the Flink training trying to
>> understand why in the ClickEventCount[1]  one task manager was reading
>> twice the speed of the other.
>>
>> I had to debug a lot of internal code of Flink to understand that it
>> depends on the adopted hash function (used by Flink to assign keys to
>> taskmanagers) that was assigning 4 keys to a TM and 2 to the other. Is
>> there a smarter way to monitor this thing (e.g. a metric like
>> taskManager_numKeys)?
>>
>> I also discovered that one cannot force how to partition keys per
>> taskmanager (i.e. use keyBy after a customPartition). Is there any
>> development effort in this direction?
>>
>> Best,
>> Flavio
>>
>> [1]
>> https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
>>
>>
>>

Re: Monitor number of keys per Taskmanager

Posted by Till Rohrmann <tr...@apache.org>.
Currently, we don't work on trying to ensure that the number of key groups
is as evenly spread as possible. As a workaround I would suggest to
increase the number of key groups or to change the key function.

Cheers,
Till

On Wed, Oct 23, 2019 at 1:42 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> This is a known issue of Flink. For example key groups can have sizes +/-
> 1 and they are currently randomly distributed across the cluster, so some
> machines will get more keys to handle then the others. If the number of
> keys is relatively small, like 3 keys per key group, the load difference
> can be quite large (some machines may get almost only key groups with size
> 2 while others will get mostly with size of 3, making 50% load difference).
>
> Unfortunately I don’t know about any concrete plans to address it. Maybe
> Till will know something more (I CC’ed him).
>
> Also I don’t think it’s exposed via a metric anywhere.
>
> Piotrek
>
> On 22 Oct 2019, at 10:00, Flavio Pompermaier <po...@okkam.it> wrote:
>
> Hi to all,
> I was looking into the Flink example of the Flink training trying to
> understand why in the ClickEventCount[1]  one task manager was reading
> twice the speed of the other.
>
> I had to debug a lot of internal code of Flink to understand that it
> depends on the adopted hash function (used by Flink to assign keys to
> taskmanagers) that was assigning 4 keys to a TM and 2 to the other. Is
> there a smarter way to monitor this thing (e.g. a metric like
> taskManager_numKeys)?
>
> I also discovered that one cannot force how to partition keys per
> taskmanager (i.e. use keyBy after a customPartition). Is there any
> development effort in this direction?
>
> Best,
> Flavio
>
> [1]
> https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
>
>
>

Re: Monitor number of keys per Taskmanager

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

This is a known issue of Flink. For example key groups can have sizes +/- 1 and they are currently randomly distributed across the cluster, so some machines will get more keys to handle then the others. If the number of keys is relatively small, like 3 keys per key group, the load difference can be quite large (some machines may get almost only key groups with size 2 while others will get mostly with size of 3, making 50% load difference).

Unfortunately I don’t know about any concrete plans to address it. Maybe Till will know something more (I CC’ed him).

Also I don’t think it’s exposed via a metric anywhere.

Piotrek

> On 22 Oct 2019, at 10:00, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Hi to all,
> I was looking into the Flink example of the Flink training trying to understand why in the ClickEventCount[1]  one task manager was reading twice the speed of the other.
> 
> I had to debug a lot of internal code of Flink to understand that it depends on the adopted hash function (used by Flink to assign keys to taskmanagers) that was assigning 4 keys to a TM and 2 to the other. Is there a smarter way to monitor this thing (e.g. a metric like taskManager_numKeys)?
> 
> I also discovered that one cannot force how to partition keys per taskmanager (i.e. use keyBy after a customPartition). Is there any development effort in this direction?
> 
> Best,
> Flavio
> 
> [1]https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java <https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java>