You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Aitozi (Jira)" <ji...@apache.org> on 2023/02/24 02:05:00 UTC
[jira] [Created] (FLINK-31205) do optimize for multi sink in a single relNode tree
Aitozi created FLINK-31205:
------------------------------
Summary: do optimize for multi sink in a single relNode tree
Key: FLINK-31205
URL: https://issues.apache.org/jira/browse/FLINK-31205
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Aitozi
Flink supports multi sink usage, but it optimize the each sink in a individual RelNode tree, this will miss some opportunity to do some cross tree optimization, eg:
{code:java}
create table newX(
a int,
b bigint,
c varchar,
d varchar,
e varchar
) with (
'connector' = 'values'
,'enable-projection-push-down' = 'true'
insert into sink_table select a, b from newX
insert into sink_table select a, 1 from newX
{code}
It will produce the plan as below, this will cause the source be consumed twice
{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b])
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Calc(select=[a, 1 AS b])
+- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a], metadata=[]]], fields=[a])
{code}
In this ticket, I propose to do a global optimization for the multi sink by
* Megre the multi sink(with same table) into a single relNode tree with an extra union node
* After optimization, split the merged union back to the original multi sink
In my poc, after step 1, it will produce the plan as below, I think it will do good for the global performacne
{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Union(all=[true], union=[a, b])
:- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
+- Reused(reference_id=[1])
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)