You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2022/03/04 22:54:52 UTC

Controlling group partitioning with DataStream

Hi all,

I need to be able to control which slot a keyBy group goes to, in order to compensate for a badly skewed dataset.

Any recommended approach to use here?

Previously (with a DataSet) I used groupBy followed by a withPartitioner, and provided my own custom partitioner.

I posted this same question to https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream <https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream>

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch


Re: Controlling group partitioning with DataStream

Posted by Ken Krugler <kk...@transpac.com>.
Hi Guowei,

Thanks for following up on this, sorry I missed your email earlier.

Unfortunately I don’t think auto-rebalancing will help my situation, because I have a small number of unique key values (low cardinality).

And processing these groups (training one deep-learning model per group) requires a lot fo memory, so I need to ensure only one group per slot.

Regards,

— Ken


> On Mar 8, 2022, at 8:35 PM, Guowei Ma <gu...@gmail.com> wrote:
> 
> Hi, Ken
> 
> If you are talking about the Batch scene, there may be another idea that the engine automatically and evenly distributes the amount of data to be processed by each Stage to each worker node. This also means that, in some cases, the user does not need to manually define a Partitioner.
> 
> At present, Flink has a FLIP-187 [1], which is working in this direction, but to achieve the above goals may also require the follow up work of FLIP-186 [2]. After the release of 1.15, we will carry out the "Auto-rebalancing of workloads" related work as soon as possible, you can pay attention to the progress of this FLIP.
> 
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler <https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler>
> [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Futureimprovements <https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Futureimprovements>
> 
> Best,
> Guowei
> 
> 
> On Wed, Mar 9, 2022 at 8:44 AM Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
> Hi Dario,
> 
> Just to close the loop on this, I answered my own question on SO.
> 
> Unfortunately it seems like the recommended solution is to do the same hack I did a while ago, which is to generate (via trial-and-error) a key that gets assigned to the target slot.
> 
> I was hoping for something a bit more elegant :)
> 
> I think it’s likely I could make it work by implementing my own version of KeyGroupStreamPartitioner, but as I’d noted in my SO question, that would involve use of some internal-only classes, so maybe not a win.
> 
> — Ken
> 
> 
>> On Mar 4, 2022, at 3:14 PM, Dario Heinisch <dario.heinisch@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi, 
>> 
>> I think you are looking for this answer from David: https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc <https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc>
>> I think then you could technically create your partitioner - though little bit cubersome - by mapping your existing keys to new keys who will have then an output to the desired
>> group & slot. 
>> 
>> Hope this may help, 
>> 
>> Dario
>> 
>> On 04.03.22 23:54, Ken Krugler wrote:
>>> Hi all,
>>> 
>>> I need to be able to control which slot a keyBy group goes to, in order to compensate for a badly skewed dataset.
>>> 
>>> Any recommended approach to use here?
>>> 
>>> Previously (with a DataSet) I used groupBy followed by a withPartitioner, and provided my own custom partitioner.
>>> 
>>> I posted this same question to https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream <https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream>
>>> 
>>> Thanks,
>>> 
>>> — Ken
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




Re: Controlling group partitioning with DataStream

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Ken

If you are talking about the Batch scene, there may be another idea that
the engine automatically and evenly distributes the amount of data to be
processed by each Stage to each worker node. This also means that, in some
cases, the user does not need to manually define a Partitioner.

At present, Flink has a FLIP-187 [1], which is working in this direction,
but to achieve the above goals may also require the follow up work of
FLIP-186 [2]. After the release of 1.15, we will carry out the
"Auto-rebalancing of workloads" related work as soon as possible, you can
pay attention to the progress of this FLIP.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Futureimprovements

Best,
Guowei


On Wed, Mar 9, 2022 at 8:44 AM Ken Krugler <kk...@transpac.com>
wrote:

> Hi Dario,
>
> Just to close the loop on this, I answered my own question on SO.
>
> Unfortunately it seems like the recommended solution is to do the same
> hack I did a while ago, which is to generate (via trial-and-error) a key
> that gets assigned to the target slot.
>
> I was hoping for something a bit more elegant :)
>
> I think it’s likely I could make it work by implementing my own version
> of KeyGroupStreamPartitioner, but as I’d noted in my SO question, that
> would involve use of some internal-only classes, so maybe not a win.
>
> — Ken
>
>
> On Mar 4, 2022, at 3:14 PM, Dario Heinisch <da...@gmail.com>
> wrote:
>
> Hi,
>
> I think you are looking for this answer from David:
> https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc
>
> I think then you could technically create your partitioner - though little
> bit cubersome - by mapping your existing keys to new keys who will have
> then an output to the desired
> group & slot.
>
> Hope this may help,
>
> Dario
> On 04.03.22 23:54, Ken Krugler wrote:
>
> Hi all,
>
> I need to be able to control which slot a keyBy group goes to, in order to
> compensate for a badly skewed dataset.
>
> Any recommended approach to use here?
>
> Previously (with a DataSet) I used groupBy followed by a withPartitioner,
> and provided my own custom partitioner.
>
> I posted this same question to
> https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
>
> Thanks,
>
> — Ken
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Re: Controlling group partitioning with DataStream

Posted by Ken Krugler <kk...@transpac.com>.
Hi Dario,

Just to close the loop on this, I answered my own question on SO.

Unfortunately it seems like the recommended solution is to do the same hack I did a while ago, which is to generate (via trial-and-error) a key that gets assigned to the target slot.

I was hoping for something a bit more elegant :)

I think it’s likely I could make it work by implementing my own version of KeyGroupStreamPartitioner, but as I’d noted in my SO question, that would involve use of some internal-only classes, so maybe not a win.

— Ken


> On Mar 4, 2022, at 3:14 PM, Dario Heinisch <da...@gmail.com> wrote:
> 
> Hi, 
> 
> I think you are looking for this answer from David: https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc <https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc>
> I think then you could technically create your partitioner - though little bit cubersome - by mapping your existing keys to new keys who will have then an output to the desired
> group & slot. 
> 
> Hope this may help, 
> 
> Dario
> 
> On 04.03.22 23:54, Ken Krugler wrote:
>> Hi all,
>> 
>> I need to be able to control which slot a keyBy group goes to, in order to compensate for a badly skewed dataset.
>> 
>> Any recommended approach to use here?
>> 
>> Previously (with a DataSet) I used groupBy followed by a withPartitioner, and provided my own custom partitioner.
>> 
>> I posted this same question to https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream <https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream>
>> 
>> Thanks,
>> 
>> — Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




Re: Controlling group partitioning with DataStream

Posted by Dario Heinisch <da...@gmail.com>.
Hi,

I think you are looking for this answer from David: 
https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc

I think then you could technically create your partitioner - though 
little bit cubersome - by mapping your existing keys to new keys who 
will have then an output to the desired
group & slot.

Hope this may help,

Dario

On 04.03.22 23:54, Ken Krugler wrote:
> Hi all,
>
> I need to be able to control which slot a keyBy group goes to, in 
> order to compensate for a badly skewed dataset.
>
> Any recommended approach to use here?
>
> Previously (with a DataSet) I used groupBy followed by a 
> withPartitioner, and provided my own custom partitioner.
>
> I posted this same question to 
> https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>