You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Avi Levi <av...@bluevoyant.com> on 2018/11/26 06:03:08 UTC

understadning kafka connector - rebalance

Hi
Looking at this example
<https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation
on heavy load stream wouldn't slow the stream ? is the rebalancing action
occurs only when there is a partition change ?
it says that "the rebelance call is causing a repartitioning of the data so
that all machines" is it actually changing the num of partitions of the
topic to match the num of flink operators ?

Avi

Re: understadning kafka connector - rebalance

Posted by Avi Levi <av...@bluevoyant.com>.
Thanks, that makes sense !

On Mon, Nov 26, 2018 at 1:06 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> DataStream x = ...
> x.rebalance().keyBy()
>
> is not a good idea.
>
> It will first distribute the records round-robin (over the network) and
> subsequently partition them by hash.
> The first shuffle is unnecessary. It does not have any effect because it
> is undone by the second partitioning.
>
> Btw. any methods on DataStream do not have any effect on Kafka topcis or
> partitions.
> In the initially quoted example, we assume that the events of the original
> DataStream are not evenly distributed among the parallel tasks. The
> rebalance() call generates an even distribution which is especially
> important if the map() operation is heavy-weight / compute intensive.
>
> Best, Fabian
>
>
>
>
>
> Am Mo., 26. Nov. 2018 um 10:59 Uhr schrieb Taher Koitawala <
> taher.koitawala@gslab.com>:
>
>> You can use rebalance before keyBy because rebalance returns DataStream.
>> The API does not allow rebalance on keyedStreamed which is returned after
>> keyBy so you are safe.
>>
>> On Mon 26 Nov, 2018, 2:25 PM Avi Levi <avi.levi@bluevoyant.com wrote:
>>
>>> Ok, thanks for the clarification. but if I use it with keyed state so
>>> the partition is by the key. rebalancing will not shuffle this partitioning
>>> ? e.g
>>> .addSource(source)
>>>       .rebalance
>>>       .keyBy(_.id)
>>>       .mapWithState(...)
>>>
>>>
>>> On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <
>>> taher.koitawala@gslab.com> wrote:
>>>
>>>> Hi Avi,
>>>>           No, rebalance is not changing the number of kafka partitions.
>>>> Lets say you have 6 kafka partitions and your flink parallelism is 8, in
>>>> this case using rebalance will send records to all downstream operators in
>>>> a round robin fashion.
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>> GS Lab Pune
>>>> +91 8407979163
>>>>
>>>>
>>>> On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <av...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>> Looking at this example
>>>>> <https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
>>>>> doing the "rebalance" (e.g messageStream.rebalance().map(...) )
>>>>> operation on heavy load stream wouldn't slow the stream ? is the
>>>>> rebalancing action occurs only when there is a partition change ?
>>>>> it says that "the rebelance call is causing a repartitioning of the
>>>>> data so that all machines" is it actually changing the num of
>>>>> partitions of the topic to match the num of flink operators ?
>>>>>
>>>>> Avi
>>>>>
>>>>

Re: understadning kafka connector - rebalance

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

DataStream x = ...
x.rebalance().keyBy()

is not a good idea.

It will first distribute the records round-robin (over the network) and
subsequently partition them by hash.
The first shuffle is unnecessary. It does not have any effect because it is
undone by the second partitioning.

Btw. any methods on DataStream do not have any effect on Kafka topcis or
partitions.
In the initially quoted example, we assume that the events of the original
DataStream are not evenly distributed among the parallel tasks. The
rebalance() call generates an even distribution which is especially
important if the map() operation is heavy-weight / compute intensive.

Best, Fabian





Am Mo., 26. Nov. 2018 um 10:59 Uhr schrieb Taher Koitawala <
taher.koitawala@gslab.com>:

> You can use rebalance before keyBy because rebalance returns DataStream.
> The API does not allow rebalance on keyedStreamed which is returned after
> keyBy so you are safe.
>
> On Mon 26 Nov, 2018, 2:25 PM Avi Levi <avi.levi@bluevoyant.com wrote:
>
>> Ok, thanks for the clarification. but if I use it with keyed state so the
>> partition is by the key. rebalancing will not shuffle this partitioning ?
>> e.g
>> .addSource(source)
>>       .rebalance
>>       .keyBy(_.id)
>>       .mapWithState(...)
>>
>>
>> On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <
>> taher.koitawala@gslab.com> wrote:
>>
>>> Hi Avi,
>>>           No, rebalance is not changing the number of kafka partitions.
>>> Lets say you have 6 kafka partitions and your flink parallelism is 8, in
>>> this case using rebalance will send records to all downstream operators in
>>> a round robin fashion.
>>>
>>> Regards,
>>> Taher Koitawala
>>> GS Lab Pune
>>> +91 8407979163
>>>
>>>
>>> On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <av...@bluevoyant.com>
>>> wrote:
>>>
>>>> Hi
>>>> Looking at this example
>>>> <https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
>>>> doing the "rebalance" (e.g messageStream.rebalance().map(...) )
>>>> operation on heavy load stream wouldn't slow the stream ? is the
>>>> rebalancing action occurs only when there is a partition change ?
>>>> it says that "the rebelance call is causing a repartitioning of the
>>>> data so that all machines" is it actually changing the num of
>>>> partitions of the topic to match the num of flink operators ?
>>>>
>>>> Avi
>>>>
>>>

Re: understadning kafka connector - rebalance

Posted by Taher Koitawala <ta...@gslab.com>.
You can use rebalance before keyBy because rebalance returns DataStream.
The API does not allow rebalance on keyedStreamed which is returned after
keyBy so you are safe.

On Mon 26 Nov, 2018, 2:25 PM Avi Levi <avi.levi@bluevoyant.com wrote:

> Ok, thanks for the clarification. but if I use it with keyed state so the
> partition is by the key. rebalancing will not shuffle this partitioning ?
> e.g
> .addSource(source)
>       .rebalance
>       .keyBy(_.id)
>       .mapWithState(...)
>
>
> On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <ta...@gslab.com>
> wrote:
>
>> Hi Avi,
>>           No, rebalance is not changing the number of kafka partitions.
>> Lets say you have 6 kafka partitions and your flink parallelism is 8, in
>> this case using rebalance will send records to all downstream operators in
>> a round robin fashion.
>>
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
>>
>>
>> On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <av...@bluevoyant.com>
>> wrote:
>>
>>> Hi
>>> Looking at this example
>>> <https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
>>> doing the "rebalance" (e.g messageStream.rebalance().map(...) )
>>> operation on heavy load stream wouldn't slow the stream ? is the
>>> rebalancing action occurs only when there is a partition change ?
>>> it says that "the rebelance call is causing a repartitioning of the
>>> data so that all machines" is it actually changing the num of
>>> partitions of the topic to match the num of flink operators ?
>>>
>>> Avi
>>>
>>

Re: understadning kafka connector - rebalance

Posted by Avi Levi <av...@bluevoyant.com>.
Ok, thanks for the clarification. but if I use it with keyed state so the
partition is by the key. rebalancing will not shuffle this partitioning ?
e.g
.addSource(source)
      .rebalance
      .keyBy(_.id)
      .mapWithState(...)


On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <ta...@gslab.com>
wrote:

> Hi Avi,
>           No, rebalance is not changing the number of kafka partitions.
> Lets say you have 6 kafka partitions and your flink parallelism is 8, in
> this case using rebalance will send records to all downstream operators in
> a round robin fashion.
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>
>
> On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <av...@bluevoyant.com> wrote:
>
>> Hi
>> Looking at this example
>> <https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
>> doing the "rebalance" (e.g messageStream.rebalance().map(...) )
>> operation on heavy load stream wouldn't slow the stream ? is the
>> rebalancing action occurs only when there is a partition change ?
>> it says that "the rebelance call is causing a repartitioning of the data
>> so that all machines" is it actually changing the num of partitions of
>> the topic to match the num of flink operators ?
>>
>> Avi
>>
>

Re: understadning kafka connector - rebalance

Posted by Taher Koitawala <ta...@gslab.com>.
Hi Avi,
          No, rebalance is not changing the number of kafka partitions.
Lets say you have 6 kafka partitions and your flink parallelism is 8, in
this case using rebalance will send records to all downstream operators in
a round robin fashion.

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <av...@bluevoyant.com> wrote:

> Hi
> Looking at this example
> <https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
> doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation
> on heavy load stream wouldn't slow the stream ? is the rebalancing action
> occurs only when there is a partition change ?
> it says that "the rebelance call is causing a repartitioning of the data
> so that all machines" is it actually changing the num of partitions of
> the topic to match the num of flink operators ?
>
> Avi
>