You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kurt Young (JIRA)" <ji...@apache.org> on 2019/05/18 11:59:00 UTC

[jira] [Closed] (FLINK-12424) Supports dag (multiple-sinks query) optimization

     [ https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kurt Young closed FLINK-12424.
------------------------------
       Resolution: Implemented
    Fix Version/s: 1.9.0

merge in 1.9.0: e038a801a87f25b30a1b47ffe5710e5d9bd44c9b

> Supports dag (multiple-sinks query) optimization
> ------------------------------------------------
>
>                 Key: FLINK-12424
>                 URL: https://issues.apache.org/jira/browse/FLINK-12424
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>         Attachments: image-2019-05-07-13-33-02-793.png
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, Flink planner will optimize the plan in {{writeToSink}} method. If there are more than one sink in a query, each sink-tree will be optimized independent and the result execution plans are also completely independent. Actually, there is a high probability of duplicate computing for a multiple-sinks query. This issue aims to resolve the above problem. 
> The basic idea of the solution is as following: 
> 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, just puts the plan into a collection.
> 2. whole plan optimization and execution: a new {{execute}} method is added in {{TableEnvironment}}, this method will trigger whole plan optimization and execute the job.
> The basic idea of dag (multiple-sinks query) optimization:
> 1. decompose the dag into different block, the leaf block is the common sub-plan
> 2. optimize each block from leaf block to root block, each block only needs to be optimized once
> e.g. 
> {code:scala}
> val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not null) t2 where a1 = b2")
> tableEnv.registerTable("TempTable", table)
> val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70")
> tableEnv.writeToSink(table1, Sink1)
> val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70")
> tableEnv.writeToSink(table2, Sink2)
> {code}
>  !image-2019-05-07-13-33-02-793.png! 
> the above plan will be decomposed into 3 blocks, block1 is the input of block2 and block3. block2 and block3 will be optimized after block1 has been optimized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)