You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/02/05 07:09:00 UTC

[jira] [Created] (FLINK-21290) Support Projection push down for Window TVF

Jark Wu created FLINK-21290:
-------------------------------

             Summary: Support Projection push down for Window TVF
                 Key: FLINK-21290
                 URL: https://issues.apache.org/jira/browse/FLINK-21290
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / Planner
            Reporter: Jark Wu


{code:scala}
  @Test
  def testTumble_ProjectionPushDown(): Unit = {
    // TODO: [b, c, e, proctime] are never used, should be pruned
    val sql =
      """
        |SELECT
        |   a,
        |   window_start,
        |   window_end,
        |   count(*),
        |   sum(d)
        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
        |GROUP BY a, window_start, window_end
      """.stripMargin
    util.verifyRelPlan(sql)
  }
{code}

For the above test, currently we get the following plan:

{code}
Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[a]])
      +- Calc(select=[a, d, rowtime])
         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
            +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
               +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
{code}

It should be able to prune fields and get the following plan:

{code}
Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[a]])
         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
               +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, d, rowtime])
{code}

The reason is we didn't transpose Project and WindowTableFunction in logical phase. 

{code}
LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)])
+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3])
   +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
               +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)