You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Qingsheng Ren (Jira)" <ji...@apache.org> on 2023/06/02 10:09:00 UTC

[jira] [Created] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate

Qingsheng Ren created FLINK-32247:
-------------------------------------

             Summary: Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
                 Key: FLINK-32247
                 URL: https://issues.apache.org/jira/browse/FLINK-32247
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.17.1, 1.16.2, 1.18.0
            Reporter: Qingsheng Ren


Considering a SQL statement below:

 
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (    SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM     TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES))     GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; {code}
which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime])
                           +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)