You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Junning Liang (Jira)" <ji...@apache.org> on 2020/11/19 03:22:00 UTC
[jira] [Created] (FLINK-20231) Sql UDTF subplan reuse on correlate
Junning Liang created FLINK-20231:
-------------------------------------
Summary: Sql UDTF subplan reuse on correlate
Key: FLINK-20231
URL: https://issues.apache.org/jira/browse/FLINK-20231
Project: Flink
Issue Type: Improvement
Affects Versions: 1.10.1, 1.10.0
Reporter: Junning Liang
Hi all,
I would like to start a discussion for subplan reuse on correlate.
when I wrote a test case for the UDTF with two sinks, I saw the relnode digest didn't reuse any except TableSourceScan. Code show as below.
{code:java}
CREATE VIEW tempTable1 as SELECT name, age, habit, length FROM sources , LATERAL TABLE(SplitStringUDTF(habits)) as T(habit, length);
INSERT INTO sinks SELECT * FROM tempTable1;
INSERT INTO sinks1 SELECT * FROM tempTable1;
{code}
And two sinks relnode digest as below.
{code:java}
Sink(name=[`default_catalog`.`default_database`.`sinks`], fields=[name, age, habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length) f1)]
Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length)]
Correlate(invocation=[SplitStringUDTF($cor1.habits)], correlate=[table(default_catalog.default_database.SplitStringUDTF($cor1.habits))], select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
TableSourceScan(table=[[default_catalog, default_database, sources, source: [HDFSTbleSource(name, age, habits)]]], fields=[name, age, habits], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits)]{code}
{code:java}
// code placeholder
Sink(name=[`default_catalog`.`default_database`.`sinks1`], fields=[name, age, habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length) f1)]
Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length)]
Correlate(invocation=[SplitStringUDTF($cor2.habits)], correlate=[table(default_catalog.default_database.SplitStringUDTF($cor2.habits))], select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
TableSourceScan(table=[[default_catalog, default_database, sources, source: [HDFSTableSource(name, age, habits)]]], fields=[name, age, habits], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits)]
{code}
As we see, only TableSourceScan plan was reused. And I found related tests in SubplanReuseTest.scala.but it would todo since 2019.
I wish some solutions have been proposed.
{code:java}
// code placeholder
@Test def testSubplanReuseOnCorrelate(): Unit = {
util.addFunction("str_split", new StringSplit())
val sqlQuery =
"""
|WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v))
|SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
""".stripMargin
// TODO the sub-plan of Correlate should be reused,
// however the digests of Correlates are different
util.verifyPlan(sqlQuery)
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)