You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xintong Song (Jira)" <ji...@apache.org> on 2023/03/23 08:52:13 UTC

[jira] [Updated] (FLINK-30821) The optimized exec plan generated by OverAggregateTest#testDiffPartitionKeysWithDiffOrderKeys2 in the case of all-blocking is not as expected

     [ https://issues.apache.org/jira/browse/FLINK-30821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xintong Song updated FLINK-30821:
---------------------------------
    Fix Version/s: 1.18.0
                       (was: 1.17.0)

> The optimized exec plan generated by OverAggregateTest#testDiffPartitionKeysWithDiffOrderKeys2 in the case of all-blocking is not as expected
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30821
>                 URL: https://issues.apache.org/jira/browse/FLINK-30821
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Junrui Li
>            Priority: Major
>             Fix For: 1.16.2, 1.18.0
>
>
> The optimized exec plan generated by OverAggregateTest#testDiffPartitionKeysWithDiffOrderKeys2 in the case of all-blocking is that
> {code:java}
> Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, null:INTEGER) / w3$o0) AS INTEGER) AS EXPR$4])
> +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
>    +- Exchange(distribution=[forward])
>       +- Sort(orderBy=[c ASC, a ASC])
>          +- Exchange(distribution=[hash[c]])
>             +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
>                +- Exchange(distribution=[forward])
>                   +- Sort(orderBy=[b ASC, c ASC])
>                      +- Exchange(distribution=[hash[b]])
>                         +- OverAggregate(orderBy=[c ASC, a ASC], window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1])
>                            +- Exchange(distribution=[forward])
>                               +- Sort(orderBy=[c ASC, a ASC])
>                                  +- Exchange(distribution=[forward])
>                                     +- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
>                                        +- Sort(orderBy=[b ASC])
>                                           +- Exchange(distribution=[single])
>                                              +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>  {code}
> However, the expected plan is that
> {code:java}
> Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, null:INTEGER) / w3$o0) AS INTEGER) AS EXPR$4])
> +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
>    +- Exchange(distribution=[forward])
>       +- Sort(orderBy=[c ASC, a ASC])
>          +- Exchange(distribution=[hash[c]])
>             +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
>                +- Exchange(distribution=[forward])
>                   +- Sort(orderBy=[b ASC, c ASC])
>                      +- Exchange(distribution=[hash[b]])
>                         +- OverAggregate(orderBy=[c ASC, a ASC], window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1])
>                            +- Exchange(distribution=[forward])
>                               +- Sort(orderBy=[c ASC, a ASC])
>                                  +- Exchange(distribution=[forward])
>                                     +- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
>                                        +- Exchange (distribution=[forward])                                   
>                                           +- Sort(orderBy=[b ASC])
>                                              +- Exchange(distribution=[single])
>                                                 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)