You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/30 07:58:57 UTC
[flink] branch release-1.10 updated: [FLINK-16070]
[table-planner-blink] Stream planner supports remove constant keys from an
aggregate
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 841c520 [FLINK-16070] [table-planner-blink] Stream planner supports remove constant keys from an aggregate
841c520 is described below
commit 841c5207e2bc0db9db5f34b4387311e40289a246
Author: godfrey he <go...@163.com>
AuthorDate: Mon Mar 30 15:58:29 2020 +0800
[FLINK-16070] [table-planner-blink] Stream planner supports remove constant keys from an aggregate
This closes #11558
---
.../planner/plan/rules/FlinkStreamRuleSets.scala | 2 +
.../plan/batch/sql/agg/HashAggregateTest.xml | 74 ++++++++++
.../plan/batch/sql/agg/SortAggregateTest.xml | 79 +++++++++++
.../table/planner/plan/stream/sql/RankTest.xml | 154 ++++++---------------
.../planner/plan/stream/sql/agg/AggregateTest.xml | 24 ++++
.../plan/stream/sql/agg/TwoStageAggregateTest.xml | 6 +-
.../plan/stream/sql/join/SemiAntiJoinTest.xml | 50 +++----
.../planner/plan/stream/table/AggregateTest.xml | 8 +-
.../plan/stream/table/TwoStageAggregateTest.xml | 12 +-
.../plan/batch/sql/agg/AggregateTestBase.scala | 9 ++
.../plan/stream/sql/agg/AggregateTest.scala | 9 ++
.../runtime/stream/sql/AggregateITCase.scala | 32 ++++-
12 files changed, 313 insertions(+), 146 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 7dc1e44..fa09738 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -109,6 +109,8 @@ object FlinkStreamRuleSets {
REWRITE_COALESCE_RULES.asScala ++
REDUCE_EXPRESSION_RULES.asScala ++
List(
+ //removes constant keys from an Agg
+ AggregateProjectPullUpConstantsRule.INSTANCE,
StreamLogicalWindowAggregateRule.INSTANCE,
// slices a project into sections which contain window agg functions
// and sections which do not.
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index 3734051..062a047 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -697,6 +697,80 @@ HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1
]]>
</Resource>
</TestCase>
+ <TestCase name="testGroupByWithConstantKey[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+ +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+ +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupByWithConstantKey[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+ +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- HashAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupByWithConstantKey[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+ +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+ +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testMaxWithFixLengthType[aggStrategy=AUTO]">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 68afff7..b9b901d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -715,6 +715,85 @@ SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1
]]>
</Resource>
</TestCase>
+ <TestCase name="testGroupByWithConstantKey[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+ +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+ +- Sort(orderBy=[a ASC])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+ +- Sort(orderBy=[a ASC])
+ +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupByWithConstantKey[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+ +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- SortAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+ +- Sort(orderBy=[a ASC])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupByWithConstantKey[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+ +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+ +- Sort(orderBy=[a ASC])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+ +- Sort(orderBy=[a ASC])
+ +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testMaxWithFixLengthType[aggStrategy=AUTO]">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 8b45c3a..c9ea8c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -56,13 +56,13 @@ LogicalProject(a=[$0], b=[$1], count_c=[$2], rank_num=[$3])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[UpdateFastStrategy[0,3]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[single], updateAsRetraction=[false], accMode=[Acc])
- +- Calc(select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
- +- Rank(strategy=[UpdateFastStrategy[0,1,2]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, cn, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
- +- GroupAggregate(groupBy=[a, b, cn], select=[a, b, cn, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[a, b, cn]], updateAsRetraction=[true], accMode=[Acc])
+ +- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
+ +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+ +- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc])
+ +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+ +- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
]]>
@@ -425,68 +425,6 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1,
]]>
</Resource>
</TestCase>
- <TestCase name="testTopNWithoutRowNumber2">
- <Resource name="sql">
- <![CDATA[
-SELECT
- CONCAT(seller_id, venture, stat_date, sku_id) as rowkey,
- seller_id,
- sku_id,
- venture,
- stat_date,
- amt_dtr,
- byr_cnt_dtr,
- pv_dtr,
- uv_dtr
-FROM (
- SELECT
- seller_id,
- sku_id,
- venture,
- stat_date,
- amt_dtr,
- byr_cnt_dtr,
- pv_dtr,
- uv_dtr,
- ROW_NUMBER() OVER (PARTITION BY seller_id, venture, stat_date
- ORDER BY amt_dtr DESC) AS rownum
- FROM (
-SELECT
- seller_id
- ,sku_id
- ,venture
- ,stat_date
- ,incr_sum(trd_amt) AS amt_dtr
- ,COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr
- ,SUM(log_pv) AS pv_dtr
- ,COUNT(DISTINCT log_visitor_id) AS uv_dtr
-FROM stream_source
-GROUP BY seller_id,sku_id,venture,stat_date
- )
-)
-WHERE rownum <= 10
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(rowkey=[CONCAT($0, $2, $3, $1)], seller_id=[$0], sku_id=[$1], venture=[$2], stat_date=[$3], amt_dtr=[$4], byr_cnt_dtr=[$5], pv_dtr=[$6], uv_dtr=[$7])
-+- LogicalFilter(condition=[<=($8, 10)])
- +- LogicalProject(seller_id=[$0], sku_id=[$1], venture=[$2], stat_date=[$3], amt_dtr=[$4], byr_cnt_dtr=[$5], pv_dtr=[$6], uv_dtr=[$7], rownum=[ROW_NUMBER() OVER (PARTITION BY $0, $2, $3 ORDER BY $4 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
- +- LogicalAggregate(group=[{0, 1, 2, 3}], amt_dtr=[INCR_SUM($4)], byr_cnt_dtr=[COUNT(DISTINCT $5)], pv_dtr=[SUM($6)], uv_dtr=[COUNT(DISTINCT $7)])
- +- LogicalTableScan(table=[[default_catalog, default_database, stream_source, source: [TestTableSource(seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CONCAT(seller_id, venture, stat_date, sku_id) AS rowkey, seller_id, sku_id, venture, stat_date, amt_dtr, byr_cnt_dtr, pv_dtr, uv_dtr], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[UpdateFastStrategy[0,1,2,3]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[seller_id, venture, stat_date], orderBy=[amt_dtr DESC], select=[seller_id, sku_id, venture, stat_date, amt_dtr, byr_cnt_dtr, pv_dtr, uv_dtr], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[seller_id, venture, stat_date]], updateAsRetraction=[false], accMode=[Acc])
- +- GroupAggregate(groupBy=[seller_id, sku_id, venture, stat_date], select=[seller_id, sku_id, venture, stat_date, INCR_SUM(trd_amt) AS amt_dtr, COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr, SUM(log_pv) AS pv_dtr, COUNT(DISTINCT log_visitor_id) AS uv_dtr], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[seller_id, sku_id, venture, stat_date]], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, stream_source, source: [TestTableSource(seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id)]]], fields=[seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id], updateAsRetraction=[true], accMode=[Acc])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testTopNOrderByCount">
<Resource name="sql">
<![CDATA[
@@ -740,46 +678,6 @@ Calc(select=[a, b, c, 10:BIGINT AS row_num], updateAsRetraction=[false], accMode
]]>
</Resource>
</TestCase>
- <TestCase name="testTopNWithGroupByConstantKey">
- <Resource name="sql">
- <![CDATA[
-SELECT *
-FROM (
- SELECT a, b, count_c,
- ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
- FROM (
-SELECT a, b, COUNT(*) AS count_c
-FROM (
-SELECT *, 'cn' AS cn
-FROM MyTable
-)
-GROUP BY a, b, cn
- ))
-WHERE row_num <= 10
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
-+- LogicalFilter(condition=[<=($3, 10)])
- +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
- +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
- +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[UpdateFastStrategy[0,1,2]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, cn, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
- +- GroupAggregate(groupBy=[a, b, cn], select=[a, b, cn, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[a, b, cn]], updateAsRetraction=[true], accMode=[Acc])
- +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testTopNWithKeyChanged">
<Resource name="sql">
<![CDATA[
@@ -847,6 +745,46 @@ Calc(select=[row_num, a, c], where=[IS NOT NULL(b)], updateAsRetraction=[false],
]]>
</Resource>
</TestCase>
+ <TestCase name="testTopNWithGroupByConstantKey">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM (
+ SELECT a, b, count_c,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
+ FROM (
+SELECT a, b, COUNT(*) AS count_c
+FROM (
+SELECT *, 'cn' AS cn
+FROM MyTable
+)
+GROUP BY a, b, cn
+ ))
+WHERE row_num <= 10
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
++- LogicalFilter(condition=[<=($3, 10)])
+ +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
+ +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+ +- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc])
+ +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+ +- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc])
+ +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testUnarySortTopNOnString">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index dd9567d..390832a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -215,6 +215,30 @@ GroupAggregate(select=[AVG_RETRACT(a) AS EXPR$0], updateAsRetraction=[false], ac
]]>
</Resource>
</TestCase>
+ <TestCase name="testGroupByWithConstantKey">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM T) t GROUP BY a, c
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+ +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testGroupByWithoutWindow">
<Resource name="sql">
<![CDATA[SELECT COUNT(a) FROM MyTable GROUP BY b]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
index a441620..ddae158 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
@@ -172,9 +172,9 @@ LogicalProject(four=[$1], EXPR$1=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[4 AS four, EXPR$1])
-+- GlobalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(sum$0) AS EXPR$1])
- +- Exchange(distribution=[hash[b, four]])
- +- LocalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS sum$0])
++- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$1])
+ +- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
+- Calc(select=[b, 4 AS four, a])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
index 7ea2831..7ef21d9 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -695,9 +695,9 @@ Calc(select=[b])
: : +- Calc(select=[1 AS EXPR$0])
: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[single])
- : +- Calc(select=[i])
- : +- GroupAggregate(groupBy=[EXPR$0, i], select=[EXPR$0, i])
- : +- Exchange(distribution=[hash[EXPR$0, i]])
+ : +- Calc(select=[true AS i])
+ : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+ : +- Exchange(distribution=[hash[EXPR$0]])
: +- Calc(select=[1 AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d, f]])
@@ -1238,9 +1238,9 @@ Calc(select=[b])
: : +- Calc(select=[1 AS EXPR$0])
: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[single])
- : +- Calc(select=[i])
- : +- GroupAggregate(groupBy=[EXPR$0, i], select=[EXPR$0, i])
- : +- Exchange(distribution=[hash[EXPR$0, i]])
+ : +- Calc(select=[true AS i])
+ : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+ : +- Exchange(distribution=[hash[EXPR$0]])
: +- Calc(select=[1 AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d]])
@@ -2274,12 +2274,12 @@ Calc(select=[b])
+- Join(joinType=[LeftAntiJoin], where=[AND(OR(=($f3, d), IS NULL(d)), =(c, f))], select=[b, c, $f3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[b, c, CASE(OR(=(c0, 0), AND(IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c1, 0), AND(IS NULL(i), >=(ck0, c1), IS NOT NULL(a))), 2, 3) AS $f3])
- : +- Join(joinType=[LeftOuterJoin], where=[=(a, EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+ : +- Join(joinType=[LeftOuterJoin], where=[=(a, EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: :- Exchange(distribution=[hash[a]])
: : +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: : :- Exchange(distribution=[single])
: : : +- Calc(select=[a, b, c, c0, ck, i0])
- : : : +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+ : : : +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: : : :- Exchange(distribution=[hash[a]])
: : : : +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: : : : :- Exchange(distribution=[single])
@@ -2290,20 +2290,22 @@ Calc(select=[b])
: : : : +- Calc(select=[i])
: : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: : : +- Exchange(distribution=[hash[i]])
- : : : +- GroupAggregate(groupBy=[i, i0], select=[i, i0])
- : : : +- Exchange(distribution=[hash[i, i0]])
- : : : +- Calc(select=[i, true AS i0])
- : : : +- Reused(reference_id=[1])
+ : : : +- Calc(select=[i, true AS i0])
+ : : : +- GroupAggregate(groupBy=[i], select=[i])
+ : : : +- Exchange(distribution=[hash[i]])
+ : : : +- Calc(select=[i, true AS i0])
+ : : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[single])
: : +- GroupAggregate(select=[COUNT(*) AS c, COUNT(EXPR$0) AS ck])
: : +- Exchange(distribution=[single])
: : +- Calc(select=[CAST(j) AS EXPR$0])
: : +- Reused(reference_id=[1])
: +- Exchange(distribution=[hash[EXPR$0]])
- : +- GroupAggregate(groupBy=[EXPR$0, i], select=[EXPR$0, i])
- : +- Exchange(distribution=[hash[EXPR$0, i]])
- : +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
- : +- Reused(reference_id=[1])
+ : +- Calc(select=[EXPR$0, true AS i])
+ : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+ : +- Exchange(distribution=[hash[EXPR$0]])
+ : +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
+ : +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[f]])
+- Calc(select=[d, f])
+- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -2518,12 +2520,12 @@ Calc(select=[b])
+- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(b, e), IS NULL(b), IS NULL(e)), OR(=($f3, d), IS NULL(d)))], select=[b, $f3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[single])
: +- Calc(select=[b, CASE(OR(=(c0, 0), AND(IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c1, 0), AND(IS NULL(i), >=(ck0, c1), IS NOT NULL(a))), 2, 3) AS $f3])
- : +- Join(joinType=[LeftOuterJoin], where=[=(a, j)], select=[a, b, c0, ck, i0, c, ck0, j, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+ : +- Join(joinType=[LeftOuterJoin], where=[=(a, j)], select=[a, b, c0, ck, i0, c, ck0, j, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: :- Exchange(distribution=[hash[a]])
: : +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c0, ck, i0, c, ck0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: : :- Exchange(distribution=[single])
: : : +- Calc(select=[a, b, c0, ck, i0])
- : : : +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+ : : : +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: : : :- Exchange(distribution=[hash[a]])
: : : : +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c, ck], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: : : : :- Exchange(distribution=[single])
@@ -2534,18 +2536,18 @@ Calc(select=[b])
: : : : +- Exchange(distribution=[single])
: : : : +- TableSourceScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(i)]]], fields=[i], reuse_id=[1])
: : : +- Exchange(distribution=[hash[i]])
- : : : +- GroupAggregate(groupBy=[i, i0], select=[i, i0])
- : : : +- Exchange(distribution=[hash[i, i0]])
- : : : +- Calc(select=[i, true AS i0])
+ : : : +- Calc(select=[i, true AS i0])
+ : : : +- GroupAggregate(groupBy=[i], select=[i])
+ : : : +- Exchange(distribution=[hash[i]])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[single])
: : +- GroupAggregate(select=[COUNT(*) AS c, COUNT(j) AS ck])
: : +- Exchange(distribution=[single])
: : +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(j)]]], fields=[j], reuse_id=[2])
: +- Exchange(distribution=[hash[j]])
- : +- GroupAggregate(groupBy=[j, i], select=[j, i])
- : +- Exchange(distribution=[hash[j, i]])
- : +- Calc(select=[j, true AS i])
+ : +- Calc(select=[j, true AS i])
+ : +- GroupAggregate(groupBy=[j], select=[j])
+ : +- Exchange(distribution=[hash[j]])
: +- Reused(reference_id=[2])
+- Exchange(distribution=[single])
+- Calc(select=[e, d])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
index 0542c55..b7c4a6e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
@@ -128,8 +128,8 @@ LogicalProject(four=[$1], EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[4 AS four, EXPR$0])
-+- GroupAggregate(groupBy=[a, four], select=[a, four, SUM(b) AS EXPR$0])
- +- Exchange(distribution=[hash[a, four]])
++- GroupAggregate(groupBy=[a], select=[a, SUM(b) AS EXPR$0])
+ +- Exchange(distribution=[hash[a]])
+- Calc(select=[a, 4 AS four, b])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
@@ -147,8 +147,8 @@ LogicalProject(four=[$1], EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[4 AS four, EXPR$0])
-+- GroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS EXPR$0])
- +- Exchange(distribution=[hash[b, four]])
++- GroupAggregate(groupBy=[b], select=[b, SUM(a) AS EXPR$0])
+ +- Exchange(distribution=[hash[b]])
+- Calc(select=[b, 4 AS four, a])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
index fc2f807..be5a7e3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
@@ -88,9 +88,9 @@ LogicalProject(four=[$1], EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[4 AS four, EXPR$0])
-+- GlobalGroupAggregate(groupBy=[a, four], select=[a, four, SUM(sum$0) AS EXPR$0])
- +- Exchange(distribution=[hash[a, four]])
- +- LocalGroupAggregate(groupBy=[a, four], select=[a, four, SUM(b) AS sum$0])
++- GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$0])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0])
+- Calc(select=[a, 4 AS four, b])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -109,9 +109,9 @@ LogicalProject(four=[$1], EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[4 AS four, EXPR$0])
-+- GlobalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(sum$0) AS EXPR$0])
- +- Exchange(distribution=[hash[b, four]])
- +- LocalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS sum$0])
++- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$0])
+ +- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
+- Calc(select=[b, 4 AS four, a])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index 71c08a1..dddb109 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -199,5 +199,14 @@ abstract class AggregateTestBase extends TableTestBase {
util.verifyPlan("SELECT b, var_sum(a) FROM MyTable1 GROUP BY b")
}
+ @Test
+ def testGroupByWithConstantKey(): Unit = {
+ val sql =
+ """
+ |SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
// TODO supports group sets
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index f923a2c..fc056ed 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -260,4 +260,13 @@ class AggregateTest extends TableTestBase {
def testMaxWithRetract(): Unit = {
util.verifyPlanWithTrait("SELECT MAX(a) FROM (SELECT MAX(a) AS a FROM T GROUP BY b)")
}
+
+ @Test
+ def testGroupByWithConstantKey(): Unit = {
+ val sql =
+ """
+ |SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM T) t GROUP BY a, c
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 005aee7..b7478fa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
@@ -32,7 +33,7 @@ import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBa
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils._
-import org.apache.flink.table.planner.runtime.utils.{GenericAggregateFunction, StreamingWithAggTestBase, TestData, TestingRetractSink}
+import org.apache.flink.table.planner.runtime.utils.{GenericAggregateFunction, StreamingWithAggTestBase, TestData, TestingRetractSink, TestingUpsertTableSink}
import org.apache.flink.table.planner.utils.DateTimeTestUtil.{localDate, localDateTime, localTime => mLocalTime}
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo
import org.apache.flink.types.Row
@@ -1223,4 +1224,33 @@ class AggregateITCase(
results.addSink(sink).setParallelism(1)
env.execute()
}
+
+ @Test
+ def testConstantGroupKeyWithUpsertSink(): Unit = {
+ val data = new mutable.MutableList[(Int, Long, String)]
+ data.+=((1, 1L, "A"))
+ data.+=((2, 2L, "B"))
+ data.+=((3, 2L, "B"))
+ data.+=((4, 3L, "C"))
+ data.+=((5, 3L, "C"))
+
+ val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val tableSink = new TestingUpsertTableSink(Array(0)).configure(
+ Array[String]("c", "bMax"), Array[TypeInformation[_]](Types.STRING, Types.LONG))
+ tEnv.registerTableSink("testSink", tableSink)
+
+ tEnv.sqlUpdate(
+ """
+ |insert into testSink
+ |select c, max(b) from
+ | (select b, c, true as f from MyTable) t
+ |group by c, f
+ """.stripMargin)
+ tEnv.execute("test")
+
+ val expected = List("A,1", "B,2", "C,3")
+ assertEquals(expected.sorted, tableSink.getUpsertResults.sorted)
+ }
}