You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Qingsheng Ren (Jira)" <ji...@apache.org> on 2023/06/02 10:09:00 UTC
[jira] [Created] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
Qingsheng Ren created FLINK-32247:
-------------------------------------
Summary: Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
Key: FLINK-32247
URL: https://issues.apache.org/jira/browse/FLINK-32247
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.16.2, 1.18.0
Reporter: Qingsheng Ren
Considering a SQL statement below:
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; {code}
which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+- Exchange(distribution=[single])
+- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end])
+- Calc(select=[window_start, window_end, window_time])
+- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+- Exchange(distribution=[hash[item]])
+- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)])
+- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)