You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "David Morávek (Jira)" <ji...@apache.org> on 2020/04/27 07:18:00 UTC
[jira] [Created] (BEAM-9824) Multiple reshuffles are ignored in
some cases on Flink batch runner.
David Morávek created BEAM-9824:
-----------------------------------
Summary: Multiple reshuffles are ignored in some cases on Flink batch runner.
Key: BEAM-9824
URL: https://issues.apache.org/jira/browse/BEAM-9824
Project: Beam
Issue Type: Bug
Components: runner-flink
Affects Versions: 2.19.0, 2.20.0
Reporter: David Morávek
Assignee: David Morávek
Fix For: 2.21.0
Multiple reshuffles are ignored in some cases on Flink batch runner. This may lead to huge performace penalty in IO connectors (when reshuffling splits).
In flink optimizer, when we `.rebalance()` dataset, is output channel is marked as `FORCED_REBALANCED`. When we chain this with another `.rebalance()`, the latter is ignored because it's source is already `FORCED_REBALANCED`, thus requested property is met. This is correct beaviour because rebalance is idempotent.
When we include `flatMap` in between rebalances -> `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, because dataset distribution may have changed (eg. you can possibli emit unbouded stream from a single element). Unfortunatelly `flatMap` output is still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle gets ignored.
This especially affects IO connectors -> `FileIO.match()` returns reshuffled list of matched files -> we split each file into ranges -> **reshuffle** -> read. Ignoring the second reshuffle leads to huge perf. degradation (5m -> 2h in one of our production pipelines)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)