You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by David Morávek <dm...@apache.org> on 2020/04/27 07:34:54 UTC

Multiple rebalances are incorrectly ignored in some cases.

Hello Flinkers,

we have run into unexpected behaviour with chained Reshuffles in Apache
Beam's Flink runner (batch).

In flink optimizer, when we `.rebalance()` dataset, is output channel is
marked as `FORCED_REBALANCED`. When we chain this with another
`.rebalance()`, the latter is ignored because it's source is already
`FORCED_REBALANCED`, thus requested property is met. This is correct
beaviour because rebalance is idempotent.

When we include `flatMap` in between rebalances ->
`.rebalance().flatMap(...).rebalance()`, we need to reshuffle again,
because dataset distribution may have changed (eg. you can possibli emit
unbouded stream from a single element). Unfortunatelly `flatMap` output is
still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle
gets ignored.

We have worked around this by replacing repartition with identity map
function with Optimizer.HINT_SHIP_STRATEGY_REPARTITION.

I have a feeling that this is just a workaround and should be fixed in
flink optimizer itself.

Relavant Beam jira - https://issues.apache.org/jira/browse/BEAM-9824

WDYT?

Thanks,

D.

Re: Multiple rebalances are incorrectly ignored in some cases.

Posted by David Morávek <da...@gmail.com>.
Hello Aljoscha,

unfortunately not, I'm not really familiar with the optimizer code and it's
really complex to debug :(

this method is as far as I got -
https://github.com/apache/flink/blob/master/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java#L301

D.


On Mon, Apr 27, 2020 at 11:24 AM Aljoscha Krettek <al...@apache.org>
wrote:

> On 27.04.20 09:34, David Morávek wrote:
>
> > When we include `flatMap` in between rebalances ->
> > `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again,
> > because dataset distribution may have changed (eg. you can possibli emit
> > unbouded stream from a single element). Unfortunatelly `flatMap` output
> is
> > still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle
> > gets ignored.
>
> This indeed seems incorrect. Did you look into the Flink code to see why
> the output of the flatMap is `FORCED_REBALANCED`?
>
> Aljoscha
>

Re: Multiple rebalances are incorrectly ignored in some cases.

Posted by Aljoscha Krettek <al...@apache.org>.
On 27.04.20 09:34, David Morávek wrote:

> When we include `flatMap` in between rebalances ->
> `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again,
> because dataset distribution may have changed (eg. you can possibli emit
> unbouded stream from a single element). Unfortunatelly `flatMap` output is
> still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle
> gets ignored.

This indeed seems incorrect. Did you look into the Flink code to see why 
the output of the flatMap is `FORCED_REBALANCED`?

Aljoscha