You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Lijie Wang (Jira)" <ji...@apache.org> on 2022/02/08 09:54:00 UTC

[jira] [Created] (FLINK-26004) Introduce ForwardForLocalKeyByPartitioner

Lijie Wang created FLINK-26004:
----------------------------------

             Summary: Introduce ForwardForLocalKeyByPartitioner
                 Key: FLINK-26004
                 URL: https://issues.apache.org/jira/browse/FLINK-26004
             Project: Flink
          Issue Type: Sub-task
            Reporter: Lijie Wang


If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner will change them except the first one to use forward partitioner, so that these operators can be chained to reduce unnecessary shuffles.

However, sometimes the local keyBy operators are not chained (e.g. multiple inputs), and this kind of forward partitioners will turn into forward job edges. These forward edges still have the local keyBy assumption, so that they cannot be changed into rescale/rebalance edges, otherwise it can lead to incorrect results. This prevents the adaptive batch scheduler from determining parallelism for other forward edge downstream job vertices (seeĀ FLINK-25046).

To solve it, I propose to introduce a new {{{}ForwardForLocalKeyByPartitioner{}}}. When SQL planner optimizes the case of multiple consecutive the same groupBy, it should use the proposed partitioner, so that the runtime framework can further decide whether the partitioner can be changed to hash or not.
h4.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)