You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (JIRA)" <ji...@apache.org> on 2019/05/21 12:18:00 UTC

[jira] [Created] (FLINK-12575) Introduce planner rules to remove redundant shuffle and collation

godfrey he created FLINK-12575:
----------------------------------

             Summary: Introduce planner rules to remove redundant shuffle and collation
                 Key: FLINK-12575
                 URL: https://issues.apache.org/jira/browse/FLINK-12575
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / Planner
            Reporter: godfrey he
            Assignee: godfrey he


{{Exchange}} and {{Sort}} is the most heavy operator, they are created in {{FlinkExpandConversionRule}} when some operators require its inputs to satisfy distribution trait or collation trait in planner rules. However, many operators could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or {{BatchExecHashJoin}} could provide distribution on its shuffle keys, {{BatchExecSortMergeJoin}} could provide distribution and collation on its join keys. If the provided traits could satisfy the required traits, the {{Exchange}} or the {{Sort}} is redundant.
e.g. 
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar

sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2 on a1 = d1 and b1 = e1

the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[LeftOuterJoin], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...)
:- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...)
:  :- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(a, d), =(b, e))], ...)
:  :  :- Exchange(distribution=[hash[a, b]])
:  :  :  +- TableSourceScan(table=[[x]], ...)
:  :  +- Exchange(distribution=[hash[d, e]])
:  :     +- TableSourceScan(table=[[y]], ...)
:  +- Exchange(distribution=[hash[a1, b1]])
:     +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
   +- TableSourceScan(table=[[t2]], ...)
{code}

In above physical plan, the {{Exchange}}s between {{SortMergeJoin}}s are redundant due to their shuffle keys are same, the {{Sort}}s in the top tow {{SortMergeJoin}}s' left hand side are redundant due to its input is sorted.

another situation is the shuffle and collation could be removed between multiple {{Over}}s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar

sql:
SELECT
    COUNT(*) OVER (PARTITION BY c ORDER BY a),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    RANK() OVER (PARTITION BY c ORDER BY a, c),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    COUNT(*) OVER (PARTITION BY c ORDER BY c)
 FROM MyTable

the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) AS w3$o0 RANG ...])
   +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w1$o0 RANG ...], window#1=[RANK(*) AS w2$o0 RANG ...], ...)
      +- Sort(orderBy=[c ASC, a ASC])
         +- Exchange(distribution=[hash[c]])
            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w1$o1, $SUM0(a) AS w0$o0 RANG ...], ...)
               +- Sort(orderBy=[b ASC, a ASC])
                  +- Exchange(distribution=[hash[b]])
                     +- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are redundant due to their shuffle keys and sort keys are same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)