You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 岳晗 <yu...@qq.com.INVALID> on 2021/11/24 07:52:23 UTC

FlinkSQL源码分段优化中,物理计划转换为ExecNodeGraph的时候,SameRelObjectShuttle、SubplanReuseShuttle一拆一合目的是啥

Hi,


请问下FlinkSQL物理计划转换为ExecNodeGraph的时候,拿到optimizedRelNodes后,


首先执行:SameRelObjectShuttle Rewrite same rel object to different rel objects.
e.g.
&nbsp; &nbsp; &nbsp; Join&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Join
&nbsp; &nbsp; &nbsp;/&nbsp; &nbsp; \&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;/&nbsp; &nbsp; \
&nbsp;Filter1 Filter2&nbsp; &nbsp; &nbsp;=&gt;&nbsp; &nbsp; &nbsp;Filter1 Filter2
&nbsp; &nbsp; &nbsp;\&nbsp; &nbsp;/&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;|&nbsp; &nbsp; &nbsp; |
&nbsp; &nbsp; &nbsp; Scan&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Scan1&nbsp; &nbsp; Scan2



然后执行:SubplanReuseShuttle
&nbsp; &nbsp; &nbsp; &nbsp;Join&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Join
&nbsp; &nbsp; &nbsp;/&nbsp; &nbsp; &nbsp; \&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; /&nbsp; &nbsp; &nbsp; \
&nbsp;Filter1&nbsp; Filter2&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Filter1&nbsp; Filter2
&nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp; =&gt;&nbsp; &nbsp; &nbsp; &nbsp;\&nbsp; &nbsp; &nbsp;/
&nbsp;Project1 Project2&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Project1
&nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; |
&nbsp; Scan1&nbsp; &nbsp; Scan2&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Scan1

val shuttle = new SameRelObjectShuttle()
val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
// reuse subplan
val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, config)
// convert FlinkPhysicalRel DAG to ExecNodeGraph
val generator = new ExecNodeGraphGenerator()
目的是啥,谢谢回复。

Re: FlinkSQL源码分段优化中,物理计划转换为ExecNodeGraph的时候,SameRelObjectShuttle、SubplanReuseShuttle一拆一合目的是啥

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

这是因为我们有配置关闭 subplan reuse 和 source reuse,因此需要先把 plan 拆开,然后再判断是否允许
reuse,如果允许才能合并。

岳晗 <yu...@qq.com.invalid> 于2021年11月24日周三 下午3:55写道:

> Hi,
>
>
> 请问下FlinkSQL物理计划转换为ExecNodeGraph的时候,拿到optimizedRelNodes后,
>
>
> 首先执行:SameRelObjectShuttle Rewrite same rel object to different rel objects.
> e.g.
> &nbsp; &nbsp; &nbsp; Join&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Join
> &nbsp; &nbsp; &nbsp;/&nbsp; &nbsp; \&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;/&nbsp; &nbsp; \
> &nbsp;Filter1 Filter2&nbsp; &nbsp; &nbsp;=&gt;&nbsp; &nbsp; &nbsp;Filter1
> Filter2
> &nbsp; &nbsp; &nbsp;\&nbsp; &nbsp;/&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;|&nbsp; &nbsp; &nbsp; |
> &nbsp; &nbsp; &nbsp; Scan&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; Scan1&nbsp; &nbsp; Scan2
>
>
>
> 然后执行:SubplanReuseShuttle
> &nbsp; &nbsp; &nbsp; &nbsp;Join&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Join
> &nbsp; &nbsp; &nbsp;/&nbsp; &nbsp; &nbsp; \&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; /&nbsp; &nbsp; &nbsp; \
> &nbsp;Filter1&nbsp; Filter2&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> Filter1&nbsp; Filter2
> &nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp;
> =&gt;&nbsp; &nbsp; &nbsp; &nbsp;\&nbsp; &nbsp; &nbsp;/
> &nbsp;Project1 Project2&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Project1
> &nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp; |&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; |
> &nbsp; Scan1&nbsp; &nbsp; Scan2&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp;Scan1
>
> val shuttle = new SameRelObjectShuttle()
> val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
> // reuse subplan
> val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj,
> config)
> // convert FlinkPhysicalRel DAG to ExecNodeGraph
> val generator = new ExecNodeGraphGenerator()
> 目的是啥,谢谢回复。