You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/30 07:58:57 UTC

[flink] branch release-1.10 updated: [FLINK-16070] [table-planner-blink] Stream planner supports remove constant keys from an aggregate

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 841c520  [FLINK-16070] [table-planner-blink] Stream planner supports remove constant keys from an aggregate
841c520 is described below

commit 841c5207e2bc0db9db5f34b4387311e40289a246
Author: godfrey he <go...@163.com>
AuthorDate: Mon Mar 30 15:58:29 2020 +0800

    [FLINK-16070] [table-planner-blink] Stream planner supports remove constant keys from an aggregate
    
    
    This closes #11558
---
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   2 +
 .../plan/batch/sql/agg/HashAggregateTest.xml       |  74 ++++++++++
 .../plan/batch/sql/agg/SortAggregateTest.xml       |  79 +++++++++++
 .../table/planner/plan/stream/sql/RankTest.xml     | 154 ++++++---------------
 .../planner/plan/stream/sql/agg/AggregateTest.xml  |  24 ++++
 .../plan/stream/sql/agg/TwoStageAggregateTest.xml  |   6 +-
 .../plan/stream/sql/join/SemiAntiJoinTest.xml      |  50 +++----
 .../planner/plan/stream/table/AggregateTest.xml    |   8 +-
 .../plan/stream/table/TwoStageAggregateTest.xml    |  12 +-
 .../plan/batch/sql/agg/AggregateTestBase.scala     |   9 ++
 .../plan/stream/sql/agg/AggregateTest.scala        |   9 ++
 .../runtime/stream/sql/AggregateITCase.scala       |  32 ++++-
 12 files changed, 313 insertions(+), 146 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 7dc1e44..fa09738 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -109,6 +109,8 @@ object FlinkStreamRuleSets {
       REWRITE_COALESCE_RULES.asScala ++
       REDUCE_EXPRESSION_RULES.asScala ++
       List(
+        //removes constant keys from an Agg
+        AggregateProjectPullUpConstantsRule.INSTANCE,
         StreamLogicalWindowAggregateRule.INSTANCE,
         // slices a project into sections which contain window agg functions
         // and sections which do not.
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index 3734051..062a047 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -697,6 +697,80 @@ HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testGroupByWithConstantKey[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+   +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+         +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupByWithConstantKey[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+   +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- HashAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupByWithConstantKey[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+   +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+         +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testMaxWithFixLengthType[aggStrategy=AUTO]">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 68afff7..b9b901d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -715,6 +715,85 @@ SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testGroupByWithConstantKey[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+   +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+   +- Sort(orderBy=[a ASC])
+      +- Exchange(distribution=[hash[a]])
+         +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+            +- Sort(orderBy=[a ASC])
+               +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupByWithConstantKey[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+   +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- SortAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+   +- Sort(orderBy=[a ASC])
+      +- Exchange(distribution=[hash[a]])
+         +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupByWithConstantKey[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+   +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+   +- Sort(orderBy=[a ASC])
+      +- Exchange(distribution=[hash[a]])
+         +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+            +- Sort(orderBy=[a ASC])
+               +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testMaxWithFixLengthType[aggStrategy=AUTO]">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 8b45c3a..c9ea8c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -56,13 +56,13 @@ LogicalProject(a=[$0], b=[$1], count_c=[$2], rank_num=[$3])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[UpdateFastStrategy[0,3]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc])
    +- Exchange(distribution=[single], updateAsRetraction=[false], accMode=[Acc])
-      +- Calc(select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
-         +- Rank(strategy=[UpdateFastStrategy[0,1,2]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, cn, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
-            +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
-               +- GroupAggregate(groupBy=[a, b, cn], select=[a, b, cn, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
-                  +- Exchange(distribution=[hash[a, b, cn]], updateAsRetraction=[true], accMode=[Acc])
+      +- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
+         +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+            +- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc])
+               +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+                  +- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc])
                      +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
                         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
 ]]>
@@ -425,68 +425,6 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTopNWithoutRowNumber2">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-    CONCAT(seller_id, venture, stat_date, sku_id) as rowkey,
-    seller_id,
-    sku_id,
-    venture,
-    stat_date,
-    amt_dtr,
-    byr_cnt_dtr,
-    pv_dtr,
-    uv_dtr
-FROM (
-  SELECT
-        seller_id,
-        sku_id,
-        venture,
-        stat_date,
-        amt_dtr,
-        byr_cnt_dtr,
-        pv_dtr,
-        uv_dtr,
-        ROW_NUMBER() OVER (PARTITION BY seller_id, venture, stat_date
-           ORDER BY amt_dtr DESC) AS rownum
-  FROM (
-SELECT
-    seller_id
-    ,sku_id
-    ,venture
-    ,stat_date
-    ,incr_sum(trd_amt) AS amt_dtr
-    ,COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr
-    ,SUM(log_pv) AS pv_dtr
-    ,COUNT(DISTINCT log_visitor_id) AS uv_dtr
-FROM stream_source
-GROUP BY seller_id,sku_id,venture,stat_date
-      )
-)
-WHERE rownum <= 10
-      ]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(rowkey=[CONCAT($0, $2, $3, $1)], seller_id=[$0], sku_id=[$1], venture=[$2], stat_date=[$3], amt_dtr=[$4], byr_cnt_dtr=[$5], pv_dtr=[$6], uv_dtr=[$7])
-+- LogicalFilter(condition=[<=($8, 10)])
-   +- LogicalProject(seller_id=[$0], sku_id=[$1], venture=[$2], stat_date=[$3], amt_dtr=[$4], byr_cnt_dtr=[$5], pv_dtr=[$6], uv_dtr=[$7], rownum=[ROW_NUMBER() OVER (PARTITION BY $0, $2, $3 ORDER BY $4 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
-      +- LogicalAggregate(group=[{0, 1, 2, 3}], amt_dtr=[INCR_SUM($4)], byr_cnt_dtr=[COUNT(DISTINCT $5)], pv_dtr=[SUM($6)], uv_dtr=[COUNT(DISTINCT $7)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, stream_source, source: [TestTableSource(seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[CONCAT(seller_id, venture, stat_date, sku_id) AS rowkey, seller_id, sku_id, venture, stat_date, amt_dtr, byr_cnt_dtr, pv_dtr, uv_dtr], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[UpdateFastStrategy[0,1,2,3]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[seller_id, venture, stat_date], orderBy=[amt_dtr DESC], select=[seller_id, sku_id, venture, stat_date, amt_dtr, byr_cnt_dtr, pv_dtr, uv_dtr], updateAsRetraction=[false], accMode=[Acc])
-   +- Exchange(distribution=[hash[seller_id, venture, stat_date]], updateAsRetraction=[false], accMode=[Acc])
-      +- GroupAggregate(groupBy=[seller_id, sku_id, venture, stat_date], select=[seller_id, sku_id, venture, stat_date, INCR_SUM(trd_amt) AS amt_dtr, COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr, SUM(log_pv) AS pv_dtr, COUNT(DISTINCT log_visitor_id) AS uv_dtr], updateAsRetraction=[false], accMode=[Acc])
-         +- Exchange(distribution=[hash[seller_id, sku_id, venture, stat_date]], updateAsRetraction=[true], accMode=[Acc])
-            +- TableSourceScan(table=[[default_catalog, default_database, stream_source, source: [TestTableSource(seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id)]]], fields=[seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id], updateAsRetraction=[true], accMode=[Acc])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testTopNOrderByCount">
     <Resource name="sql">
       <![CDATA[
@@ -740,46 +678,6 @@ Calc(select=[a, b, c, 10:BIGINT AS row_num], updateAsRetraction=[false], accMode
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTopNWithGroupByConstantKey">
-    <Resource name="sql">
-      <![CDATA[
-SELECT *
-FROM (
-  SELECT a, b, count_c,
-      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
-  FROM (
-SELECT a, b, COUNT(*) AS count_c
-FROM (
-SELECT *, 'cn' AS cn
-FROM MyTable
-)
-GROUP BY a, b, cn
-      ))
-WHERE row_num <= 10
-      ]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
-+- LogicalFilter(condition=[<=($3, 10)])
-   +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
-      +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
-         +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
-            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[UpdateFastStrategy[0,1,2]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, cn, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-   +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
-      +- GroupAggregate(groupBy=[a, b, cn], select=[a, b, cn, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
-         +- Exchange(distribution=[hash[a, b, cn]], updateAsRetraction=[true], accMode=[Acc])
-            +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
-               +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testTopNWithKeyChanged">
     <Resource name="sql">
       <![CDATA[
@@ -847,6 +745,46 @@ Calc(select=[row_num, a, c], where=[IS NOT NULL(b)], updateAsRetraction=[false],
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testTopNWithGroupByConstantKey">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM (
+  SELECT a, b, count_c,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
+  FROM (
+SELECT a, b, COUNT(*) AS count_c
+FROM (
+SELECT *, 'cn' AS cn
+FROM MyTable
+)
+GROUP BY a, b, cn
+      ))
+WHERE row_num <= 10
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
++- LogicalFilter(condition=[<=($3, 10)])
+   +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
+         +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
+            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+   +- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc])
+      +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+         +- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc])
+            +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
+               +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testUnarySortTopNOnString">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index dd9567d..390832a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -215,6 +215,30 @@ GroupAggregate(select=[AVG_RETRACT(a) AS EXPR$0], updateAsRetraction=[false], ac
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testGroupByWithConstantKey">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM T) t GROUP BY a, c
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+   +- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
++- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+         +- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testGroupByWithoutWindow">
     <Resource name="sql">
       <![CDATA[SELECT COUNT(a) FROM MyTable GROUP BY b]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
index a441620..ddae158 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
@@ -172,9 +172,9 @@ LogicalProject(four=[$1], EXPR$1=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[4 AS four, EXPR$1])
-+- GlobalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(sum$0) AS EXPR$1])
-   +- Exchange(distribution=[hash[b, four]])
-      +- LocalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS sum$0])
++- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$1])
+   +- Exchange(distribution=[hash[b]])
+      +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
          +- Calc(select=[b, 4 AS four, a])
             +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
index 7ea2831..7ef21d9 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -695,9 +695,9 @@ Calc(select=[b])
    :        :              +- Calc(select=[1 AS EXPR$0])
    :        :                 +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
    :        +- Exchange(distribution=[single])
-   :           +- Calc(select=[i])
-   :              +- GroupAggregate(groupBy=[EXPR$0, i], select=[EXPR$0, i])
-   :                 +- Exchange(distribution=[hash[EXPR$0, i]])
+   :           +- Calc(select=[true AS i])
+   :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+   :                 +- Exchange(distribution=[hash[EXPR$0]])
    :                    +- Calc(select=[1 AS EXPR$0, true AS i])
    :                       +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d, f]])
@@ -1238,9 +1238,9 @@ Calc(select=[b])
    :        :              +- Calc(select=[1 AS EXPR$0])
    :        :                 +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
    :        +- Exchange(distribution=[single])
-   :           +- Calc(select=[i])
-   :              +- GroupAggregate(groupBy=[EXPR$0, i], select=[EXPR$0, i])
-   :                 +- Exchange(distribution=[hash[EXPR$0, i]])
+   :           +- Calc(select=[true AS i])
+   :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+   :                 +- Exchange(distribution=[hash[EXPR$0]])
    :                    +- Calc(select=[1 AS EXPR$0, true AS i])
    :                       +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d]])
@@ -2274,12 +2274,12 @@ Calc(select=[b])
 +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=($f3, d), IS NULL(d)), =(c, f))], select=[b, c, $f3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    :- Exchange(distribution=[hash[c]])
    :  +- Calc(select=[b, c, CASE(OR(=(c0, 0), AND(IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c1, 0), AND(IS NULL(i), >=(ck0, c1), IS NOT NULL(a))), 2, 3) AS $f3])
-   :     +- Join(joinType=[LeftOuterJoin], where=[=(a, EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+   :     +- Join(joinType=[LeftOuterJoin], where=[=(a, EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :- Exchange(distribution=[hash[a]])
    :        :  +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :     :- Exchange(distribution=[single])
    :        :     :  +- Calc(select=[a, b, c, c0, ck, i0])
-   :        :     :     +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+   :        :     :     +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :     :        :- Exchange(distribution=[hash[a]])
    :        :     :        :  +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :     :        :     :- Exchange(distribution=[single])
@@ -2290,20 +2290,22 @@ Calc(select=[b])
    :        :     :        :              +- Calc(select=[i])
    :        :     :        :                 +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
    :        :     :        +- Exchange(distribution=[hash[i]])
-   :        :     :           +- GroupAggregate(groupBy=[i, i0], select=[i, i0])
-   :        :     :              +- Exchange(distribution=[hash[i, i0]])
-   :        :     :                 +- Calc(select=[i, true AS i0])
-   :        :     :                    +- Reused(reference_id=[1])
+   :        :     :           +- Calc(select=[i, true AS i0])
+   :        :     :              +- GroupAggregate(groupBy=[i], select=[i])
+   :        :     :                 +- Exchange(distribution=[hash[i]])
+   :        :     :                    +- Calc(select=[i, true AS i0])
+   :        :     :                       +- Reused(reference_id=[1])
    :        :     +- Exchange(distribution=[single])
    :        :        +- GroupAggregate(select=[COUNT(*) AS c, COUNT(EXPR$0) AS ck])
    :        :           +- Exchange(distribution=[single])
    :        :              +- Calc(select=[CAST(j) AS EXPR$0])
    :        :                 +- Reused(reference_id=[1])
    :        +- Exchange(distribution=[hash[EXPR$0]])
-   :           +- GroupAggregate(groupBy=[EXPR$0, i], select=[EXPR$0, i])
-   :              +- Exchange(distribution=[hash[EXPR$0, i]])
-   :                 +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
-   :                    +- Reused(reference_id=[1])
+   :           +- Calc(select=[EXPR$0, true AS i])
+   :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+   :                 +- Exchange(distribution=[hash[EXPR$0]])
+   :                    +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
+   :                       +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -2518,12 +2520,12 @@ Calc(select=[b])
 +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(b, e), IS NULL(b), IS NULL(e)), OR(=($f3, d), IS NULL(d)))], select=[b, $f3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    :- Exchange(distribution=[single])
    :  +- Calc(select=[b, CASE(OR(=(c0, 0), AND(IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c1, 0), AND(IS NULL(i), >=(ck0, c1), IS NOT NULL(a))), 2, 3) AS $f3])
-   :     +- Join(joinType=[LeftOuterJoin], where=[=(a, j)], select=[a, b, c0, ck, i0, c, ck0, j, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+   :     +- Join(joinType=[LeftOuterJoin], where=[=(a, j)], select=[a, b, c0, ck, i0, c, ck0, j, i], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :- Exchange(distribution=[hash[a]])
    :        :  +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c0, ck, i0, c, ck0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :     :- Exchange(distribution=[single])
    :        :     :  +- Calc(select=[a, b, c0, ck, i0])
-   :        :     :     +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+   :        :     :     +- Join(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, ck, i, i0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :     :        :- Exchange(distribution=[hash[a]])
    :        :     :        :  +- Join(joinType=[InnerJoin], where=[true], select=[a, b, c, ck], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
    :        :     :        :     :- Exchange(distribution=[single])
@@ -2534,18 +2536,18 @@ Calc(select=[b])
    :        :     :        :           +- Exchange(distribution=[single])
    :        :     :        :              +- TableSourceScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(i)]]], fields=[i], reuse_id=[1])
    :        :     :        +- Exchange(distribution=[hash[i]])
-   :        :     :           +- GroupAggregate(groupBy=[i, i0], select=[i, i0])
-   :        :     :              +- Exchange(distribution=[hash[i, i0]])
-   :        :     :                 +- Calc(select=[i, true AS i0])
+   :        :     :           +- Calc(select=[i, true AS i0])
+   :        :     :              +- GroupAggregate(groupBy=[i], select=[i])
+   :        :     :                 +- Exchange(distribution=[hash[i]])
    :        :     :                    +- Reused(reference_id=[1])
    :        :     +- Exchange(distribution=[single])
    :        :        +- GroupAggregate(select=[COUNT(*) AS c, COUNT(j) AS ck])
    :        :           +- Exchange(distribution=[single])
    :        :              +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(j)]]], fields=[j], reuse_id=[2])
    :        +- Exchange(distribution=[hash[j]])
-   :           +- GroupAggregate(groupBy=[j, i], select=[j, i])
-   :              +- Exchange(distribution=[hash[j, i]])
-   :                 +- Calc(select=[j, true AS i])
+   :           +- Calc(select=[j, true AS i])
+   :              +- GroupAggregate(groupBy=[j], select=[j])
+   :                 +- Exchange(distribution=[hash[j]])
    :                    +- Reused(reference_id=[2])
    +- Exchange(distribution=[single])
       +- Calc(select=[e, d])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
index 0542c55..b7c4a6e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
@@ -128,8 +128,8 @@ LogicalProject(four=[$1], EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[4 AS four, EXPR$0])
-+- GroupAggregate(groupBy=[a, four], select=[a, four, SUM(b) AS EXPR$0])
-   +- Exchange(distribution=[hash[a, four]])
++- GroupAggregate(groupBy=[a], select=[a, SUM(b) AS EXPR$0])
+   +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, 4 AS four, b])
          +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -147,8 +147,8 @@ LogicalProject(four=[$1], EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[4 AS four, EXPR$0])
-+- GroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS EXPR$0])
-   +- Exchange(distribution=[hash[b, four]])
++- GroupAggregate(groupBy=[b], select=[b, SUM(a) AS EXPR$0])
+   +- Exchange(distribution=[hash[b]])
       +- Calc(select=[b, 4 AS four, a])
          +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
index fc2f807..be5a7e3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
@@ -88,9 +88,9 @@ LogicalProject(four=[$1], EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[4 AS four, EXPR$0])
-+- GlobalGroupAggregate(groupBy=[a, four], select=[a, four, SUM(sum$0) AS EXPR$0])
-   +- Exchange(distribution=[hash[a, four]])
-      +- LocalGroupAggregate(groupBy=[a, four], select=[a, four, SUM(b) AS sum$0])
++- GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$0])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0])
          +- Calc(select=[a, 4 AS four, b])
             +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -109,9 +109,9 @@ LogicalProject(four=[$1], EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[4 AS four, EXPR$0])
-+- GlobalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(sum$0) AS EXPR$0])
-   +- Exchange(distribution=[hash[b, four]])
-      +- LocalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS sum$0])
++- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$0])
+   +- Exchange(distribution=[hash[b]])
+      +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
          +- Calc(select=[b, 4 AS four, a])
             +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index 71c08a1..dddb109 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -199,5 +199,14 @@ abstract class AggregateTestBase extends TableTestBase {
     util.verifyPlan("SELECT b, var_sum(a) FROM MyTable1 GROUP BY b")
   }
 
+  @Test
+  def testGroupByWithConstantKey(): Unit = {
+    val sql =
+      """
+        |SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
+
   // TODO supports group sets
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index f923a2c..fc056ed 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -260,4 +260,13 @@ class AggregateTest extends TableTestBase {
   def testMaxWithRetract(): Unit = {
     util.verifyPlanWithTrait("SELECT MAX(a) FROM (SELECT MAX(a) AS a FROM T GROUP BY b)")
   }
+
+  @Test
+  def testGroupByWithConstantKey(): Unit = {
+    val sql =
+      """
+        |SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM T) t GROUP BY a, c
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 005aee7..b7478fa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -32,7 +33,7 @@ import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBa
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
 import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils._
-import org.apache.flink.table.planner.runtime.utils.{GenericAggregateFunction, StreamingWithAggTestBase, TestData, TestingRetractSink}
+import org.apache.flink.table.planner.runtime.utils.{GenericAggregateFunction, StreamingWithAggTestBase, TestData, TestingRetractSink, TestingUpsertTableSink}
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.{localDate, localDateTime, localTime => mLocalTime}
 import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo
 import org.apache.flink.types.Row
@@ -1223,4 +1224,33 @@ class AggregateITCase(
     results.addSink(sink).setParallelism(1)
     env.execute()
   }
+
+  @Test
+  def testConstantGroupKeyWithUpsertSink(): Unit = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "A"))
+    data.+=((2, 2L, "B"))
+    data.+=((3, 2L, "B"))
+    data.+=((4, 3L, "C"))
+    data.+=((5, 3L, "C"))
+
+    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val tableSink = new TestingUpsertTableSink(Array(0)).configure(
+      Array[String]("c", "bMax"), Array[TypeInformation[_]](Types.STRING, Types.LONG))
+    tEnv.registerTableSink("testSink", tableSink)
+
+    tEnv.sqlUpdate(
+      """
+        |insert into testSink
+        |select c, max(b) from
+        | (select b, c, true as f from MyTable) t
+        |group by c, f
+      """.stripMargin)
+    tEnv.execute("test")
+
+    val expected = List("A,1", "B,2", "C,3")
+    assertEquals(expected.sorted, tableSink.getUpsertResults.sorted)
+  }
 }