You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (JIRA)" <ji...@apache.org> on 2019/05/21 12:18:00 UTC
[jira] [Created] (FLINK-12575) Introduce planner rules to remove
redundant shuffle and collation
godfrey he created FLINK-12575:
----------------------------------
Summary: Introduce planner rules to remove redundant shuffle and collation
Key: FLINK-12575
URL: https://issues.apache.org/jira/browse/FLINK-12575
Project: Flink
Issue Type: New Feature
Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he
{{Exchange}} and {{Sort}} is the most heavy operator, they are created in {{FlinkExpandConversionRule}} when some operators require its inputs to satisfy distribution trait or collation trait in planner rules. However, many operators could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or {{BatchExecHashJoin}} could provide distribution on its shuffle keys, {{BatchExecSortMergeJoin}} could provide distribution and collation on its join keys. If the provided traits could satisfy the required traits, the {{Exchange}} or the {{Sort}} is redundant.
e.g.
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar
sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2 on a1 = d1 and b1 = e1
the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[LeftOuterJoin], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...)
:- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...)
: :- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(a, d), =(b, e))], ...)
: : :- Exchange(distribution=[hash[a, b]])
: : : +- TableSourceScan(table=[[x]], ...)
: : +- Exchange(distribution=[hash[d, e]])
: : +- TableSourceScan(table=[[y]], ...)
: +- Exchange(distribution=[hash[a1, b1]])
: +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
+- TableSourceScan(table=[[t2]], ...)
{code}
In above physical plan, the {{Exchange}}s between {{SortMergeJoin}}s are redundant due to their shuffle keys are same, the {{Sort}}s in the top tow {{SortMergeJoin}}s' left hand side are redundant due to its input is sorted.
another situation is the shuffle and collation could be removed between multiple {{Over}}s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar
sql:
SELECT
COUNT(*) OVER (PARTITION BY c ORDER BY a),
SUM(a) OVER (PARTITION BY b ORDER BY a),
RANK() OVER (PARTITION BY c ORDER BY a, c),
SUM(a) OVER (PARTITION BY b ORDER BY a),
COUNT(*) OVER (PARTITION BY c ORDER BY c)
FROM MyTable
the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) AS w3$o0 RANG ...])
+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w1$o0 RANG ...], window#1=[RANK(*) AS w2$o0 RANG ...], ...)
+- Sort(orderBy=[c ASC, a ASC])
+- Exchange(distribution=[hash[c]])
+- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w1$o1, $SUM0(a) AS w0$o0 RANG ...], ...)
+- Sort(orderBy=[b ASC, a ASC])
+- Exchange(distribution=[hash[b]])
+- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are redundant due to their shuffle keys and sort keys are same.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)