You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zoucao (Jira)" <ji...@apache.org> on 2022/04/01 08:12:00 UTC

[jira] [Updated] (FLINK-26982) strike a balance between reuse the same RelNode and project/filter/limit/partition push down

     [ https://issues.apache.org/jira/browse/FLINK-26982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zoucao updated FLINK-26982:
---------------------------
    Description: 
Now, Flink has effective reuse logic to reuse the same RelNode and subplan, but it will lose efficacy in some situations, like project/filter/limit/partition push down, if one of them is enabled, the new source is not the same as old one, such that the new source can not be reused anymore. 

For some complicated SQL, many views will be created from the same source table, and the scan RelNode can not be reused, such that many of the same threads about reading source data will be created in one task, which will cause the memory problem and sometimes will cause reading amplification.

Should we do something to enforce reusing some specific relNode decided by users themselves?

The following SQL shows the situation proposed above.

{code:java}
create table fs(
    a int,
    b string,
    c  bigint
) PARTITIONED by ( c )with (
    'connector' = 'filesystem',
    'format' = 'csv',
    'path' = 'file:///tmp/test'
);
select * from
   (select * from fs limit 1)
union all
   (select * from fs where a = 2)
union all
   (select 1, b, c from fs)
union all
   (select 1, b, c from fs where c = 1)
{code}
== Optimized Execution Plan ==
{code:java}
Union(all=[true], union=[a, b, c])
:- Union(all=[true], union=[a, b, c])
:  :- Union(all=[true], union=[a, b, c])
:  :  :- Limit(offset=[0], fetch=[1])
:  :  :  +- Exchange(distribution=[single])
:  :  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, limit=[1]]], fields=[a, b, c])
:  :  +- Calc(select=[CAST(2 AS INTEGER) AS a, b, c], where=[(a = 2)])
:  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[=(a, 2)]]], fields=[a, b, c])
:  +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, c])
:     +- TableSourceScan(table=[[default_catalog, default_database, fs, project=[b, c], metadata=[]]], fields=[b, c])
+- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, CAST(1 AS BIGINT) AS c])
   +- TableSourceScan(table=[[default_catalog, default_database, fs, partitions=[{c=1}], project=[b], metadata=[]]], fields=[b])
{code}



  was:
Now, Flink has effective reuse logic to reuse the same RelNode and subplan, but it will lose efficacy in some situations, like project/filter/limit/partition push down, if one of them is enabled, the new source is not the same from old one, so the source can not be reused anymore. 
For some complicated SQL, many views will be created from the same table, and the scan RelNode can not be reused, such that many of the same threads about reading source data will be created in one task, which will cause the memory problem and sometimes will cause reading amplification.
Should we do something to enforce reusing decided by users themselves?
The following SQL shows the situation proposed above.

{code:java}
create table fs(
    a int,
    b string,
    c  bigint
) PARTITIONED by ( c )with (
    'connector' = 'filesystem',
    'format' = 'csv',
    'path' = 'file:///tmp/test'
);
select * from
   (select * from fs limit 1)
union all
   (select * from fs where a = 2)
union all
   (select 1, b, c from fs)
union all
   (select 1, b, c from fs where c = 1)
{code}
== Optimized Execution Plan ==
{code:java}
Union(all=[true], union=[a, b, c])
:- Union(all=[true], union=[a, b, c])
:  :- Union(all=[true], union=[a, b, c])
:  :  :- Limit(offset=[0], fetch=[1])
:  :  :  +- Exchange(distribution=[single])
:  :  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, limit=[1]]], fields=[a, b, c])
:  :  +- Calc(select=[CAST(2 AS INTEGER) AS a, b, c], where=[(a = 2)])
:  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[=(a, 2)]]], fields=[a, b, c])
:  +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, c])
:     +- TableSourceScan(table=[[default_catalog, default_database, fs, project=[b, c], metadata=[]]], fields=[b, c])
+- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, CAST(1 AS BIGINT) AS c])
   +- TableSourceScan(table=[[default_catalog, default_database, fs, partitions=[{c=1}], project=[b], metadata=[]]], fields=[b])
{code}




>  strike a balance between reuse the same RelNode and project/filter/limit/partition push down
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26982
>                 URL: https://issues.apache.org/jira/browse/FLINK-26982
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: zoucao
>            Priority: Major
>
> Now, Flink has effective reuse logic to reuse the same RelNode and subplan, but it will lose efficacy in some situations, like project/filter/limit/partition push down, if one of them is enabled, the new source is not the same as old one, such that the new source can not be reused anymore. 
> For some complicated SQL, many views will be created from the same source table, and the scan RelNode can not be reused, such that many of the same threads about reading source data will be created in one task, which will cause the memory problem and sometimes will cause reading amplification.
> Should we do something to enforce reusing some specific relNode decided by users themselves?
> The following SQL shows the situation proposed above.
> {code:java}
> create table fs(
>     a int,
>     b string,
>     c  bigint
> ) PARTITIONED by ( c )with (
>     'connector' = 'filesystem',
>     'format' = 'csv',
>     'path' = 'file:///tmp/test'
> );
> select * from
>    (select * from fs limit 1)
> union all
>    (select * from fs where a = 2)
> union all
>    (select 1, b, c from fs)
> union all
>    (select 1, b, c from fs where c = 1)
> {code}
> == Optimized Execution Plan ==
> {code:java}
> Union(all=[true], union=[a, b, c])
> :- Union(all=[true], union=[a, b, c])
> :  :- Union(all=[true], union=[a, b, c])
> :  :  :- Limit(offset=[0], fetch=[1])
> :  :  :  +- Exchange(distribution=[single])
> :  :  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, limit=[1]]], fields=[a, b, c])
> :  :  +- Calc(select=[CAST(2 AS INTEGER) AS a, b, c], where=[(a = 2)])
> :  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[=(a, 2)]]], fields=[a, b, c])
> :  +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, c])
> :     +- TableSourceScan(table=[[default_catalog, default_database, fs, project=[b, c], metadata=[]]], fields=[b, c])
> +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, CAST(1 AS BIGINT) AS c])
>    +- TableSourceScan(table=[[default_catalog, default_database, fs, partitions=[{c=1}], project=[b], metadata=[]]], fields=[b])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)