You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jing Zhang (Jira)" <ji...@apache.org> on 2022/01/11 07:26:00 UTC

[jira] [Created] (FLINK-25604) Remove useless aggregate function

Jing Zhang created FLINK-25604:
----------------------------------

             Summary: Remove useless aggregate function 
                 Key: FLINK-25604
                 URL: https://issues.apache.org/jira/browse/FLINK-25604
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / Planner
            Reporter: Jing Zhang


We expect useless aggregate call could be removed after projection push down.
But sometimes, planner is unexpected. For example,

{code:sql}
SELECT 
  d
FROM (
  SELECT
    d,
    c,
    row_number() OVER (PARTITION BY d ORDER BY e desc)  review_rank
  FROM (
    SELECT e, d, max(f) AS c FROM Table5 GROUP BY e, d)
  )
WHERE review_rank = 1
{code}

The plan is 

{code:java}
Calc(select=[d], where=[=(w0$o0, 1:BIGINT)])
+- OverAggregate(partitionBy=[d], orderBy=[e DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[e, d, c, w0$o0])
   +- Sort(orderBy=[d ASC, e DESC])
      +- Exchange(distribution=[hash[d]])
         +- HashAggregate(isMerge=[true], groupBy=[e, d], select=[e, d, Final_MAX(max$0) AS c])
            +- Exchange(distribution=[hash[e, d]])
               +- LocalHashAggregate(groupBy=[e, d], select=[e, d, Partial_MAX(f) AS max$0])
                  +- Calc(select=[e, d, f])
                     +- BoundedStreamScan(table=[[default_catalog, default_database, Table5]], fields=[d, e, f, g, h])
{code}

In the above sql, max(c) could be removed because it is projected out before sink.
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)