You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (JIRA)" <ji...@apache.org> on 2019/04/24 14:15:00 UTC

[jira] [Updated] (FLINK-12321) Supports sub-plan reuse

     [ https://issues.apache.org/jira/browse/FLINK-12321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

godfrey he updated FLINK-12321:
-------------------------------
    Issue Type: New Feature  (was: Bug)

> Supports sub-plan reuse
> -----------------------
>
>                 Key: FLINK-12321
>                 URL: https://issues.apache.org/jira/browse/FLINK-12321
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>
> Many query plans have duplicated sub-plan, and their computing logic is identical. So we could reuse duplicated sub-plans to reduce computing. 
> Digest of RelNode is an identifier to distinguish RelNode, and the digest of sub-plan contains all inner-nodes' digests. we could use the digest of sub-plan to find duplicated sub-plan.
> e.g. 
> {code:sql}
> WITH r AS (SELECT a FROM x LIMIT 10)
> SELECT r1.a FROM r r1, r r2 WHERE r1.a = r2.a
> the physical plan after sub-plan reuse:
> Calc(select=[a])
> +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], isBroadcast=[true], build=[right])
>    :- Exchange(distribution=[hash[a]])
>    :  +- Calc(select=[a], reuse_id=[1])
>    :     +- Limit(offset=[0], fetch=[10], global=[true])
>    :        +- Exchange(distribution=[single])
>    :           +- Limit(offset=[0], fetch=[10], global=[false])
>    :              +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>    +- Exchange(distribution=[broadcast])
>       +- Reused(reference_id=[1])
> {code}
> sub-plan: Calc-Limit-Exchange-Limit-TableSourceScan is reused.
> For batch job, reused node might lead to a deadlock when a HashJoin or NestedLoopJoin have same reused inputs. The probe side of HashJoin could start to read data only after build side has finished. If there is no full dam (DamBehavior#FULL_DAM) operators (e.g. Sort, HashAggregate) in probe side, the data will be blocked by probe side. In this case, we could set Exchange node (if it does not exist, add new one) as BATCH mode to break up the deadlock.(The Exchange node is a full dam operator now)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)