You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/02/05 07:09:00 UTC
[jira] [Created] (FLINK-21290) Support Projection push down for
Window TVF
Jark Wu created FLINK-21290:
-------------------------------
Summary: Support Projection push down for Window TVF
Key: FLINK-21290
URL: https://issues.apache.org/jira/browse/FLINK-21290
Project: Flink
Issue Type: Sub-task
Components: Table SQL / Planner
Reporter: Jark Wu
{code:scala}
@Test
def testTumble_ProjectionPushDown(): Unit = {
// TODO: [b, c, e, proctime] are never used, should be pruned
val sql =
"""
|SELECT
| a,
| window_start,
| window_end,
| count(*),
| sum(d)
|FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
|GROUP BY a, window_start, window_end
""".stripMargin
util.verifyRelPlan(sql)
}
{code}
For the above test, currently we get the following plan:
{code}
Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, d, rowtime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
{code}
It should be able to prune fields and get the following plan:
{code}
Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, d, rowtime])
{code}
The reason is we didn't transpose Project and WindowTableFunction in logical phase.
{code}
LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)])
+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3])
+- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)