You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2019/09/19 08:48:45 UTC

What is the right way to use the physical partitioning strategy in Data Streams?

I am executing a data stream application which uses rebalance. Basically I
am counting words using "src -> split -> physicalPartitionStrategy -> keyBy
-> sum -> print". I am running 3 examples, one without physical
partition strategy, one with rebalance strategy [1], and one with
partial partition strategy from [2].
I know that the keyBy operator actually kills what rebalance is doing
because it splits the stream by key and if I have a stream with skewed key,
one parallel instance of the operator after the keyBy will be overloaded.
However, I was expecting that *before the keyBy* I would have a balanced
stream, which is not happening.

Basically, I want to see the difference in records/sec between operators
when I use rebalance or any other physical partition strategy. However,
when I found no difference in the records/sec metrics of all operators when
I am running 3 different physical partition strategies. Screenshots of
Prometheus+Grafana are attached.

Maybe I am measuring the wrong operator, or maybe I am not using the
rebalance in the right way, or I am not doing a good use case to test the
rebalance transformation.
I am also testing a different physical partition to later try to implement
the issue "FLINK-1725 New Partitioner for better load balancing for skewed
data" [2]. I am not sure, but I guess that all physical partition
strategies have to be implemented on a KeyedStream.

DataStream<String> text = env.addSource(new WordSource());
// split lines in strings
DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
Tokenizer());
// choose a partitioning strategy
DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
DataStream<Tuple2<String, Integer>> partitionedStream =
tokenizer.rebalance();
DataStream<Tuple2<String, Integer>> partitionedStream =
tokenizer.partitionByPartial(0);
// count
partitionedStream.keyBy(0).sum(1).print();

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
[2] https://issues.apache.org/jira/browse/FLINK-1725

thanks,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: What is the right way to use the physical partitioning strategy in Data Streams?

Posted by Felipe Gutierrez <fe...@gmail.com>.
yes. It will be very welcome a discussion with who knows better than me.

Basically, I am trying to implement the issue FLINK-1725 [1] that was gave
up on March 2017. Stephan Ewen said that there are more issues to be fixed
before going to this implementation and I don't really know which are them.

[1] https://issues.apache.org/jira/browse/FLINK-1725

Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Sep 23, 2019 at 3:47 PM Biao Liu <mm...@gmail.com> wrote:

> Wow, that's really cool! There are indeed a lot works you have done. IMO
> it's beyond the scope of user group somewhat.
>
> Just one small concern, I'm not sure I have fully understood your way of
> "tackle data skew by altering the way Flink partition keys using
> KeyedStream".
>
> From my understanding, key-group is used for rescaling job. Like
> supporting reusing state after changing the parallelism of operator.
> I'm not sure whether you are in the right direction or not. It seems that
> you are implementing something deeper than user interface. User interface
> is stable, while implementation is not. Usually it's not recommended to
> support a feature based on implementation.
>
> If you have strong reasons to change the implementation, I would suggest
> to start a discussion in dev mailing list. Maybe it could be supported
> officially. What do you think?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Mon, 23 Sep 2019 at 20:54, Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>>
>> I`ve implemented a combiner [1] in Flink by extending
>> OneInputStreamOperator in Flink. I call my operator using "transform".
>> It works well and I guess it is useful if I import this operator in the
>> DataStream.java. I just need more to check if I need to touch other parts
>> of the source code.
>>
>> But now I want to tackle data skew by altering the way Flink partition
>> keys using KeyedStream.
>>
>> [1]
>> https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Mon, Sep 23, 2019 at 2:37 PM Biao Liu <mm...@gmail.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> If I understand correctly, you want to solve data skew caused by
>>> imbalanced key?
>>>
>>> There is a common strategy to solve this kind of problem,
>>> pre-aggregation. Like combiner of MapReduce.
>>> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
>>> afraid you have to implement it by yourself.
>>>
>>> For example, introducing a function caching some data (time or count
>>> based). This function should be before "keyby". And it's on a non-keyed
>>> stream. It does pre-aggregation just like what the aggregation after
>>> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
>>>
>>> I also found a suggestion [1] from Fabian, although it's long time ago.
>>>
>>> Hope it helps.
>>>
>>> 1.
>>> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com> wrote:
>>>
>>>> thanks Biao,
>>>>
>>>> I see. To achieve what I want to do I need to work with KeyedStream. I
>>>> downloaded the Flink source code to learn and alter the KeyedStream to my
>>>> needs. I am not sure but it is a lot of work because as far as I understood
>>>> the key-groups have to be predictable [1]. and altering this touches a lot
>>>> of other parts of the source code.
>>>>
>>>> However, If I guarantee that they (key-groups) are predictable, I will
>>>> be able to rebalance, rescale, .... the keys to other worker-nodes.
>>>>
>>>> [1]
>>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>>>
>>>> Thanks,
>>>> Felipe
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mm...@gmail.com> wrote:
>>>>
>>>>> Hi Felipe,
>>>>>
>>>>> Flink job graph is DAG based. It seems that you set an "edge property"
>>>>> (partitioner) several times.
>>>>> Flink does not support multiple partitioners on one edge. The later
>>>>> one overrides the priors. That means the "keyBy" overrides the "rebalance"
>>>>> and "partitionByPartial".
>>>>>
>>>>> You could insert some nodes between these partitioners to satisfy your
>>>>> requirement. For example,
>>>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>>>>
>>>>> Thanks,
>>>>> Biao /'bɪ.aʊ/
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>>
>>>>>> I am executing a data stream application which uses rebalance.
>>>>>> Basically I am counting words using "src -> split ->
>>>>>> physicalPartitionStrategy -> keyBy -> sum -> print". I am running 3
>>>>>> examples, one without physical partition strategy, one with rebalance
>>>>>> strategy [1], and one with partial partition strategy from [2].
>>>>>> I know that the keyBy operator actually kills what rebalance is doing
>>>>>> because it splits the stream by key and if I have a stream with skewed key,
>>>>>> one parallel instance of the operator after the keyBy will be overloaded.
>>>>>> However, I was expecting that *before the keyBy* I would have a
>>>>>> balanced stream, which is not happening.
>>>>>>
>>>>>> Basically, I want to see the difference in records/sec between
>>>>>> operators when I use rebalance or any other physical partition strategy.
>>>>>> However, when I found no difference in the records/sec metrics of all
>>>>>> operators when I am running 3 different physical partition strategies.
>>>>>> Screenshots of Prometheus+Grafana are attached.
>>>>>>
>>>>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>>>>> rebalance in the right way, or I am not doing a good use case to test the
>>>>>> rebalance transformation.
>>>>>> I am also testing a different physical partition to later try to
>>>>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>>>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>>>>> partition strategies have to be implemented on a KeyedStream.
>>>>>>
>>>>>> DataStream<String> text = env.addSource(new WordSource());
>>>>>> // split lines in strings
>>>>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>>>>> Tokenizer());
>>>>>> // choose a partitioning strategy
>>>>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>>> tokenizer.rebalance();
>>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>>> tokenizer.partitionByPartial(0);
>>>>>> // count
>>>>>> partitionedStream.keyBy(0).sum(1).print();
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>>>>
>>>>>> thanks,
>>>>>> Felipe
>>>>>>
>>>>>> *--*
>>>>>> *-- Felipe Gutierrez*
>>>>>>
>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>
>>>>>

Re: What is the right way to use the physical partitioning strategy in Data Streams?

Posted by Biao Liu <mm...@gmail.com>.
Wow, that's really cool! There are indeed a lot works you have done. IMO
it's beyond the scope of user group somewhat.

Just one small concern, I'm not sure I have fully understood your way of
"tackle data skew by altering the way Flink partition keys using
KeyedStream".

From my understanding, key-group is used for rescaling job. Like supporting
reusing state after changing the parallelism of operator.
I'm not sure whether you are in the right direction or not. It seems that
you are implementing something deeper than user interface. User interface
is stable, while implementation is not. Usually it's not recommended to
support a feature based on implementation.

If you have strong reasons to change the implementation, I would suggest to
start a discussion in dev mailing list. Maybe it could be supported
officially. What do you think?

Thanks,
Biao /'bɪ.aʊ/



On Mon, 23 Sep 2019 at 20:54, Felipe Gutierrez <fe...@gmail.com>
wrote:

>
> I`ve implemented a combiner [1] in Flink by extending
> OneInputStreamOperator in Flink. I call my operator using "transform".
> It works well and I guess it is useful if I import this operator in the
> DataStream.java. I just need more to check if I need to touch other parts
> of the source code.
>
> But now I want to tackle data skew by altering the way Flink partition
> keys using KeyedStream.
>
> [1]
> https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Sep 23, 2019 at 2:37 PM Biao Liu <mm...@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> If I understand correctly, you want to solve data skew caused by
>> imbalanced key?
>>
>> There is a common strategy to solve this kind of problem,
>> pre-aggregation. Like combiner of MapReduce.
>> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
>> afraid you have to implement it by yourself.
>>
>> For example, introducing a function caching some data (time or count
>> based). This function should be before "keyby". And it's on a non-keyed
>> stream. It does pre-aggregation just like what the aggregation after
>> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
>>
>> I also found a suggestion [1] from Fabian, although it's long time ago.
>>
>> Hope it helps.
>>
>> 1.
>> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> thanks Biao,
>>>
>>> I see. To achieve what I want to do I need to work with KeyedStream. I
>>> downloaded the Flink source code to learn and alter the KeyedStream to my
>>> needs. I am not sure but it is a lot of work because as far as I understood
>>> the key-groups have to be predictable [1]. and altering this touches a lot
>>> of other parts of the source code.
>>>
>>> However, If I guarantee that they (key-groups) are predictable, I will
>>> be able to rebalance, rescale, .... the keys to other worker-nodes.
>>>
>>> [1]
>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>>
>>> Thanks,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mm...@gmail.com> wrote:
>>>
>>>> Hi Felipe,
>>>>
>>>> Flink job graph is DAG based. It seems that you set an "edge property"
>>>> (partitioner) several times.
>>>> Flink does not support multiple partitioners on one edge. The later one
>>>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>>>> "partitionByPartial".
>>>>
>>>> You could insert some nodes between these partitioners to satisfy your
>>>> requirement. For example,
>>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>>>
>>>> Thanks,
>>>> Biao /'bɪ.aʊ/
>>>>
>>>>
>>>>
>>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>
>>>>> I am executing a data stream application which uses rebalance.
>>>>> Basically I am counting words using "src -> split ->
>>>>> physicalPartitionStrategy -> keyBy -> sum -> print". I am running 3
>>>>> examples, one without physical partition strategy, one with rebalance
>>>>> strategy [1], and one with partial partition strategy from [2].
>>>>> I know that the keyBy operator actually kills what rebalance is doing
>>>>> because it splits the stream by key and if I have a stream with skewed key,
>>>>> one parallel instance of the operator after the keyBy will be overloaded.
>>>>> However, I was expecting that *before the keyBy* I would have a
>>>>> balanced stream, which is not happening.
>>>>>
>>>>> Basically, I want to see the difference in records/sec between
>>>>> operators when I use rebalance or any other physical partition strategy.
>>>>> However, when I found no difference in the records/sec metrics of all
>>>>> operators when I am running 3 different physical partition strategies.
>>>>> Screenshots of Prometheus+Grafana are attached.
>>>>>
>>>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>>>> rebalance in the right way, or I am not doing a good use case to test the
>>>>> rebalance transformation.
>>>>> I am also testing a different physical partition to later try to
>>>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>>>> partition strategies have to be implemented on a KeyedStream.
>>>>>
>>>>> DataStream<String> text = env.addSource(new WordSource());
>>>>> // split lines in strings
>>>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>>>> Tokenizer());
>>>>> // choose a partitioning strategy
>>>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>> tokenizer.rebalance();
>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>> tokenizer.partitionByPartial(0);
>>>>> // count
>>>>> partitionedStream.keyBy(0).sum(1).print();
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>>>
>>>>> thanks,
>>>>> Felipe
>>>>>
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>

Re: What is the right way to use the physical partitioning strategy in Data Streams?

Posted by Felipe Gutierrez <fe...@gmail.com>.
I`ve implemented a combiner [1] in Flink by extending
OneInputStreamOperator in Flink. I call my operator using "transform".
It works well and I guess it is useful if I import this operator in the
DataStream.java. I just need more to check if I need to touch other parts
of the source code.

But now I want to tackle data skew by altering the way Flink partition keys
using KeyedStream.

[1]
https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Sep 23, 2019 at 2:37 PM Biao Liu <mm...@gmail.com> wrote:

> Hi Felipe,
>
> If I understand correctly, you want to solve data skew caused by
> imbalanced key?
>
> There is a common strategy to solve this kind of problem, pre-aggregation.
> Like combiner of MapReduce.
> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
> afraid you have to implement it by yourself.
>
> For example, introducing a function caching some data (time or count
> based). This function should be before "keyby". And it's on a non-keyed
> stream. It does pre-aggregation just like what the aggregation after
> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
>
> I also found a suggestion [1] from Fabian, although it's long time ago.
>
> Hope it helps.
>
> 1.
> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> thanks Biao,
>>
>> I see. To achieve what I want to do I need to work with KeyedStream. I
>> downloaded the Flink source code to learn and alter the KeyedStream to my
>> needs. I am not sure but it is a lot of work because as far as I understood
>> the key-groups have to be predictable [1]. and altering this touches a lot
>> of other parts of the source code.
>>
>> However, If I guarantee that they (key-groups) are predictable, I will be
>> able to rebalance, rescale, .... the keys to other worker-nodes.
>>
>> [1]
>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mm...@gmail.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> Flink job graph is DAG based. It seems that you set an "edge property"
>>> (partitioner) several times.
>>> Flink does not support multiple partitioners on one edge. The later one
>>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>>> "partitionByPartial".
>>>
>>> You could insert some nodes between these partitioners to satisfy your
>>> requirement. For example,
>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com> wrote:
>>>
>>>> I am executing a data stream application which uses rebalance.
>>>> Basically I am counting words using "src -> split ->
>>>> physicalPartitionStrategy -> keyBy -> sum -> print". I am running 3
>>>> examples, one without physical partition strategy, one with rebalance
>>>> strategy [1], and one with partial partition strategy from [2].
>>>> I know that the keyBy operator actually kills what rebalance is doing
>>>> because it splits the stream by key and if I have a stream with skewed key,
>>>> one parallel instance of the operator after the keyBy will be overloaded.
>>>> However, I was expecting that *before the keyBy* I would have a
>>>> balanced stream, which is not happening.
>>>>
>>>> Basically, I want to see the difference in records/sec between
>>>> operators when I use rebalance or any other physical partition strategy.
>>>> However, when I found no difference in the records/sec metrics of all
>>>> operators when I am running 3 different physical partition strategies.
>>>> Screenshots of Prometheus+Grafana are attached.
>>>>
>>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>>> rebalance in the right way, or I am not doing a good use case to test the
>>>> rebalance transformation.
>>>> I am also testing a different physical partition to later try to
>>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>>> partition strategies have to be implemented on a KeyedStream.
>>>>
>>>> DataStream<String> text = env.addSource(new WordSource());
>>>> // split lines in strings
>>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>>> Tokenizer());
>>>> // choose a partitioning strategy
>>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>> tokenizer.rebalance();
>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>> tokenizer.partitionByPartial(0);
>>>> // count
>>>> partitionedStream.keyBy(0).sum(1).print();
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>>
>>>> thanks,
>>>> Felipe
>>>>
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>

Re: What is the right way to use the physical partitioning strategy in Data Streams?

Posted by Biao Liu <mm...@gmail.com>.
Hi Felipe,

If I understand correctly, you want to solve data skew caused by imbalanced
key?

There is a common strategy to solve this kind of problem, pre-aggregation.
Like combiner of MapReduce.
But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
afraid you have to implement it by yourself.

For example, introducing a function caching some data (time or count
based). This function should be before "keyby". And it's on a non-keyed
stream. It does pre-aggregation just like what the aggregation after
"keyby" does. In this way, the skewed keyed data would be reduced a lot.

I also found a suggestion [1] from Fabian, although it's long time ago.

Hope it helps.

1.
https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation

Thanks,
Biao /'bɪ.aʊ/



On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <fe...@gmail.com>
wrote:

> thanks Biao,
>
> I see. To achieve what I want to do I need to work with KeyedStream. I
> downloaded the Flink source code to learn and alter the KeyedStream to my
> needs. I am not sure but it is a lot of work because as far as I understood
> the key-groups have to be predictable [1]. and altering this touches a lot
> of other parts of the source code.
>
> However, If I guarantee that they (key-groups) are predictable, I will be
> able to rebalance, rescale, .... the keys to other worker-nodes.
>
> [1]
> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mm...@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> Flink job graph is DAG based. It seems that you set an "edge property"
>> (partitioner) several times.
>> Flink does not support multiple partitioners on one edge. The later one
>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>> "partitionByPartial".
>>
>> You could insert some nodes between these partitioners to satisfy your
>> requirement. For example,
>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> I am executing a data stream application which uses rebalance. Basically
>>> I am counting words using "src -> split -> physicalPartitionStrategy ->
>>> keyBy -> sum -> print". I am running 3 examples, one without physical
>>> partition strategy, one with rebalance strategy [1], and one with
>>> partial partition strategy from [2].
>>> I know that the keyBy operator actually kills what rebalance is doing
>>> because it splits the stream by key and if I have a stream with skewed key,
>>> one parallel instance of the operator after the keyBy will be overloaded.
>>> However, I was expecting that *before the keyBy* I would have a
>>> balanced stream, which is not happening.
>>>
>>> Basically, I want to see the difference in records/sec between operators
>>> when I use rebalance or any other physical partition strategy. However,
>>> when I found no difference in the records/sec metrics of all operators when
>>> I am running 3 different physical partition strategies. Screenshots of
>>> Prometheus+Grafana are attached.
>>>
>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>> rebalance in the right way, or I am not doing a good use case to test the
>>> rebalance transformation.
>>> I am also testing a different physical partition to later try to
>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>> partition strategies have to be implemented on a KeyedStream.
>>>
>>> DataStream<String> text = env.addSource(new WordSource());
>>> // split lines in strings
>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>> Tokenizer());
>>> // choose a partitioning strategy
>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>> tokenizer.rebalance();
>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>> tokenizer.partitionByPartial(0);
>>> // count
>>> partitionedStream.keyBy(0).sum(1).print();
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>
>>> thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>

Re: What is the right way to use the physical partitioning strategy in Data Streams?

Posted by Felipe Gutierrez <fe...@gmail.com>.
thanks Biao,

I see. To achieve what I want to do I need to work with KeyedStream. I
downloaded the Flink source code to learn and alter the KeyedStream to my
needs. I am not sure but it is a lot of work because as far as I understood
the key-groups have to be predictable [1]. and altering this touches a lot
of other parts of the source code.

However, If I guarantee that they (key-groups) are predictable, I will be
able to rebalance, rescale, .... the keys to other worker-nodes.

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mm...@gmail.com> wrote:

> Hi Felipe,
>
> Flink job graph is DAG based. It seems that you set an "edge property"
> (partitioner) several times.
> Flink does not support multiple partitioners on one edge. The later one
> overrides the priors. That means the "keyBy" overrides the "rebalance" and
> "partitionByPartial".
>
> You could insert some nodes between these partitioners to satisfy your
> requirement. For example,
> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> I am executing a data stream application which uses rebalance. Basically
>> I am counting words using "src -> split -> physicalPartitionStrategy ->
>> keyBy -> sum -> print". I am running 3 examples, one without physical
>> partition strategy, one with rebalance strategy [1], and one with
>> partial partition strategy from [2].
>> I know that the keyBy operator actually kills what rebalance is doing
>> because it splits the stream by key and if I have a stream with skewed key,
>> one parallel instance of the operator after the keyBy will be overloaded.
>> However, I was expecting that *before the keyBy* I would have a balanced
>> stream, which is not happening.
>>
>> Basically, I want to see the difference in records/sec between operators
>> when I use rebalance or any other physical partition strategy. However,
>> when I found no difference in the records/sec metrics of all operators when
>> I am running 3 different physical partition strategies. Screenshots of
>> Prometheus+Grafana are attached.
>>
>> Maybe I am measuring the wrong operator, or maybe I am not using the
>> rebalance in the right way, or I am not doing a good use case to test the
>> rebalance transformation.
>> I am also testing a different physical partition to later try to
>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>> for skewed data" [2]. I am not sure, but I guess that all physical
>> partition strategies have to be implemented on a KeyedStream.
>>
>> DataStream<String> text = env.addSource(new WordSource());
>> // split lines in strings
>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>> Tokenizer());
>> // choose a partitioning strategy
>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>> DataStream<Tuple2<String, Integer>> partitionedStream =
>> tokenizer.rebalance();
>> DataStream<Tuple2<String, Integer>> partitionedStream =
>> tokenizer.partitionByPartial(0);
>> // count
>> partitionedStream.keyBy(0).sum(1).print();
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>
>> thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>

Re: What is the right way to use the physical partitioning strategy in Data Streams?

Posted by Biao Liu <mm...@gmail.com>.
Hi Felipe,

Flink job graph is DAG based. It seems that you set an "edge property"
(partitioner) several times.
Flink does not support multiple partitioners on one edge. The later one
overrides the priors. That means the "keyBy" overrides the "rebalance" and
"partitionByPartial".

You could insert some nodes between these partitioners to satisfy your
requirement. For example,
`sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <fe...@gmail.com>
wrote:

> I am executing a data stream application which uses rebalance. Basically I
> am counting words using "src -> split -> physicalPartitionStrategy -> keyBy
> -> sum -> print". I am running 3 examples, one without physical
> partition strategy, one with rebalance strategy [1], and one with
> partial partition strategy from [2].
> I know that the keyBy operator actually kills what rebalance is doing
> because it splits the stream by key and if I have a stream with skewed key,
> one parallel instance of the operator after the keyBy will be overloaded.
> However, I was expecting that *before the keyBy* I would have a balanced
> stream, which is not happening.
>
> Basically, I want to see the difference in records/sec between operators
> when I use rebalance or any other physical partition strategy. However,
> when I found no difference in the records/sec metrics of all operators when
> I am running 3 different physical partition strategies. Screenshots of
> Prometheus+Grafana are attached.
>
> Maybe I am measuring the wrong operator, or maybe I am not using the
> rebalance in the right way, or I am not doing a good use case to test the
> rebalance transformation.
> I am also testing a different physical partition to later try to implement
> the issue "FLINK-1725 New Partitioner for better load balancing for skewed
> data" [2]. I am not sure, but I guess that all physical partition
> strategies have to be implemented on a KeyedStream.
>
> DataStream<String> text = env.addSource(new WordSource());
> // split lines in strings
> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
> Tokenizer());
> // choose a partitioning strategy
> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
> DataStream<Tuple2<String, Integer>> partitionedStream =
> tokenizer.rebalance();
> DataStream<Tuple2<String, Integer>> partitionedStream =
> tokenizer.partitionByPartial(0);
> // count
> partitionedStream.keyBy(0).sum(1).print();
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
> [2] https://issues.apache.org/jira/browse/FLINK-1725
>
> thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>