You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2020/01/11 11:44:00 UTC

[jira] [Commented] (FLINK-15555) Delete TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED option for subplaner reuse

    [ https://issues.apache.org/jira/browse/FLINK-15555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013449#comment-17013449 ] 

godfrey he commented on FLINK-15555:
------------------------------------

hi  [~hailong wang], If the build side and probe side of a HashJoin or NestedLoopJoin has same input, there is a deadlock in the DAG. To solve this, the planner will set as BATCH mode to the Exchange node (if there is no Exchange node, add one) for probe side to breakup the deadlock.  In above plan, the first Exchange node (from top to down order) is set as BATCH mode. Give a simpler example:

SQL: TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED=true
{code:java}
SELECT * FROM MyTable t1, MyTable t2 WHERE t1.a = t2.b
{code}

Plan with source reuse:
{code:java}
HashJoin(joinType=[InnerJoin], where=[=(a, b0)], select=[a, b, c, a0, b0, c0], build=[right])
:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Exchange(distribution=[hash[b]])
   +- Reused(reference_id=[1])
{code}
the total cost is: *source_read_cost* + *write_source_data_to_disk_cost* + *read_source_data_from_disk_cost* + exchage_cost * 2 + join_cost

Plan without source reuse:
{code:java}
HashJoin(joinType=[InnerJoin], where=[=(a, b0)], select=[a, b, c, a0, b0, c0], build=[right])
:- Exchange(distribution=[hash[a]])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[b]])
   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
the total cost is: *source_read_cost* * *2* + exchage_cost * 2 + join_cost

the planner can't choose the best plan (lower cost). If the table source connects local files, plan without source reuse is the best plan, because *write_source_data_to_disk_cost* + *read_source_data_from_disk_cost* > *source_read_cost*. while if table source connects a remote data source and the cost of reading is very high, plan wiht source reuse may be the best plan, because  *write_source_data_to_disk_cost* + *read_source_data_from_disk_cost* < *source_read_cost*.

Now, we do not introduce more fine-grained cost mode for source reading, and let users to choose it through  TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED.


> Delete TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED  option for subplaner reuse
> -----------------------------------------------------------------------
>
>                 Key: FLINK-15555
>                 URL: https://issues.apache.org/jira/browse/FLINK-15555
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.10.0
>            Reporter: hailong wang
>            Priority: Major
>             Fix For: 1.11.0
>
>
> Blink planner supports subplan reuse. If TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED is true, the optimizer will try to find out duplicated sub-plans and reuse them. And  if TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED is true, the optimizer will try to find out duplicated table sources and reuse them.
> The option of TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED used to defined whether TableSourceScan should be reused.
> But if  the parent's relNode of TableSourceScan can be reused, it will be also reused even if TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED is false, just like follow sql:
> {code:java}
> WITH t AS (SELECT a, b, e FROM x, y WHERE x.a = y.d)
> SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5
> {code}
> the plan may be as follow:
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[=(b, e0)], select=[a, b, e, a0, b0, e0], build=[right])
> :- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH])
> :  +- Calc(select=[a, b, e])
> :     +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left])
> :        :- Exchange(distribution=[hash[a]])
> :        :  +- Calc(select=[a, b], where=[<(a, 10)])
> :        :     +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> :        +- Exchange(distribution=[hash[d]], reuse_id=[1])
> :           +- Calc(select=[d, e])
> :              +- TableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
> +- Exchange(distribution=[hash[e]])
>    +- Calc(select=[a, b, e])
>       +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left])
>          :- Exchange(distribution=[hash[a]])
>          :  +- Calc(select=[a, b], where=[>(a, 5)])
>          :     +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>          +- Reused(reference_id=[1])
> {code}
> So I think it is useless to defined this option, only TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED will be ok.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)