You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Martin Neumann <mn...@spotify.com> on 2014/10/28 20:53:48 UTC

load balancing groups

I have some problem with load balancing and was wondering how to deal with
this kind of problem in Flink.
The input I have is a data set of grouped ID's that I join with metadata
for each ID. Then I need to compare each Item in a group with each other
item in that group and if necessary splitting it into different subgroups.
In flink its a join followed by a group reduce.

The problem is that the groups differ a lot in size. 90% of the groups are
done in 5 minutes while the rest takes 2 hours. In order to get this more
efficient I would need to distribute the N to N comparison that currently
is done in the group reduce function. Anyone has an idea how I can do that
in a simple way?

My current Idea is to make the group reduce step emit computation
partitions and then do another flat-map step to do the actual computation.
Would this solve the problem?

cheers Martin

Re: load balancing groups

Posted by Fabian Hueske <fh...@apache.org>.
Hmm, just found that there is no JoinHint that would allow what I described
above.

Broadcasting one input and using the other one to build a hash-tables is
usually not a good thing to do, because the broadcasted side should be much
smaller than the other one...

2014-10-31 21:56 GMT+01:00 Fabian Hueske <fh...@apache.org>:

> Just had another idea.
> The group-wise crossing that you are doing is actually a self-join on the
> grouping key.
> The system has currently no special strategy to deal with selfjoins. That
> means both inputs of the join (which are identical) are treated as two
> individual inputs. If you force a broadcast of the one side and build a
> hash partition on the other side, the following would happen:
>
> The broadcasted input would be replicate and sent to each individual
> worker thread. The other input would remain local and be still partitioned
> and therefore smaller on each node. That's why you would build the
> hash-table from the partitioned input. The larger, replicated input would
> be streamed along the hash tables. Because the inputs are not partitioned
> on the key, there should be no loadbalancing issues (depending on the
> previous partitioning, it can be even perfectly balanced...)
>
> However, this might not work (well) if the input is too large to be
> replicated a lot (or the smaller partitions are too large for in-memory
> hash-tables).
>
> Best, Fabian
>
> 2014-10-30 17:56 GMT+01:00 Fabian Hueske <fh...@apache.org>:
>
>> Hi Martin,
>>
>> Flink does not have features to mitigate data skew at the moment, such as
>> dynamic partitioning.
>> That would also "only" allow to process large groups as an individual
>> partitions and multiple smaller groups together in other partitions.
>> The issue of having a large group would not be solved with that. This is
>> more on the application-level right now and could for example be solved by
>> adding something like a group-cross operator...
>>
>> I think your approach of emitting multiple smaller partitions from a
>> group-reduce, reshuffle (there is a rebalance operator [1]), and apply a
>> flatmap sounds like a good idea to me.
>> At least, I didn't come up with a better approach ;-)
>>
>> Cheers, Fabian
>>
>> [1]
>> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations
>>
>> 2014-10-28 20:53 GMT+01:00 Martin Neumann <mn...@spotify.com>:
>>
>>> I have some problem with load balancing and was wondering how to deal
>>> with
>>> this kind of problem in Flink.
>>> The input I have is a data set of grouped ID's that I join with metadata
>>> for each ID. Then I need to compare each Item in a group with each other
>>> item in that group and if necessary splitting it into different
>>> subgroups.
>>> In flink its a join followed by a group reduce.
>>>
>>> The problem is that the groups differ a lot in size. 90% of the groups
>>> are
>>> done in 5 minutes while the rest takes 2 hours. In order to get this more
>>> efficient I would need to distribute the N to N comparison that currently
>>> is done in the group reduce function. Anyone has an idea how I can do
>>> that
>>> in a simple way?
>>>
>>> My current Idea is to make the group reduce step emit computation
>>> partitions and then do another flat-map step to do the actual
>>> computation.
>>> Would this solve the problem?
>>>
>>> cheers Martin
>>>
>>
>>
>

Re: load balancing groups

Posted by Fabian Hueske <fh...@apache.org>.
Just had another idea.
The group-wise crossing that you are doing is actually a self-join on the
grouping key.
The system has currently no special strategy to deal with selfjoins. That
means both inputs of the join (which are identical) are treated as two
individual inputs. If you force a broadcast of the one side and build a
hash partition on the other side, the following would happen:

The broadcasted input would be replicate and sent to each individual worker
thread. The other input would remain local and be still partitioned and
therefore smaller on each node. That's why you would build the hash-table
from the partitioned input. The larger, replicated input would be streamed
along the hash tables. Because the inputs are not partitioned on the key,
there should be no loadbalancing issues (depending on the previous
partitioning, it can be even perfectly balanced...)

However, this might not work (well) if the input is too large to be
replicated a lot (or the smaller partitions are too large for in-memory
hash-tables).

Best, Fabian

2014-10-30 17:56 GMT+01:00 Fabian Hueske <fh...@apache.org>:

> Hi Martin,
>
> Flink does not have features to mitigate data skew at the moment, such as
> dynamic partitioning.
> That would also "only" allow to process large groups as an individual
> partitions and multiple smaller groups together in other partitions.
> The issue of having a large group would not be solved with that. This is
> more on the application-level right now and could for example be solved by
> adding something like a group-cross operator...
>
> I think your approach of emitting multiple smaller partitions from a
> group-reduce, reshuffle (there is a rebalance operator [1]), and apply a
> flatmap sounds like a good idea to me.
> At least, I didn't come up with a better approach ;-)
>
> Cheers, Fabian
>
> [1]
> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations
>
> 2014-10-28 20:53 GMT+01:00 Martin Neumann <mn...@spotify.com>:
>
>> I have some problem with load balancing and was wondering how to deal with
>> this kind of problem in Flink.
>> The input I have is a data set of grouped ID's that I join with metadata
>> for each ID. Then I need to compare each Item in a group with each other
>> item in that group and if necessary splitting it into different subgroups.
>> In flink its a join followed by a group reduce.
>>
>> The problem is that the groups differ a lot in size. 90% of the groups are
>> done in 5 minutes while the rest takes 2 hours. In order to get this more
>> efficient I would need to distribute the N to N comparison that currently
>> is done in the group reduce function. Anyone has an idea how I can do that
>> in a simple way?
>>
>> My current Idea is to make the group reduce step emit computation
>> partitions and then do another flat-map step to do the actual computation.
>> Would this solve the problem?
>>
>> cheers Martin
>>
>
>

Re: load balancing groups

Posted by Fabian Hueske <fh...@apache.org>.
Hi Martin,

Flink does not have features to mitigate data skew at the moment, such as
dynamic partitioning.
That would also "only" allow to process large groups as an individual
partitions and multiple smaller groups together in other partitions.
The issue of having a large group would not be solved with that. This is
more on the application-level right now and could for example be solved by
adding something like a group-cross operator...

I think your approach of emitting multiple smaller partitions from a
group-reduce, reshuffle (there is a rebalance operator [1]), and apply a
flatmap sounds like a good idea to me.
At least, I didn't come up with a better approach ;-)

Cheers, Fabian

[1]
http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations

2014-10-28 20:53 GMT+01:00 Martin Neumann <mn...@spotify.com>:

> I have some problem with load balancing and was wondering how to deal with
> this kind of problem in Flink.
> The input I have is a data set of grouped ID's that I join with metadata
> for each ID. Then I need to compare each Item in a group with each other
> item in that group and if necessary splitting it into different subgroups.
> In flink its a join followed by a group reduce.
>
> The problem is that the groups differ a lot in size. 90% of the groups are
> done in 5 minutes while the rest takes 2 hours. In order to get this more
> efficient I would need to distribute the N to N comparison that currently
> is done in the group reduce function. Anyone has an idea how I can do that
> in a simple way?
>
> My current Idea is to make the group reduce step emit computation
> partitions and then do another flat-map step to do the actual computation.
> Would this solve the problem?
>
> cheers Martin
>