You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/12 02:57:11 UTC

[flink] branch master updated: [FLINK-13563][table-planner-blink] TumblingGroupWindow should implement toString method to explain more info

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cda582  [FLINK-13563][table-planner-blink] TumblingGroupWindow should implement toString method to explain more info
0cda582 is described below

commit 0cda582487372d3f2717cb4fc7b0f8f818e18d03
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Aug 2 18:36:17 2019 +0800

    [FLINK-13563][table-planner-blink] TumblingGroupWindow should implement toString method to explain more info
    
    This closes #9347
---
 .../table/planner/plan/logical/groupWindows.scala  |   2 +
 .../apache/flink/table/api/stream/ExplainTest.xml  |   8 +-
 .../planner/plan/batch/sql/DagOptimizationTest.xml |   4 +-
 .../table/planner/plan/batch/sql/UnnestTest.xml    |   4 +-
 .../batch/sql/agg/AggregateReduceGroupingTest.xml  |  10 +-
 .../plan/batch/sql/agg/WindowAggregateTest.xml     | 312 ++++++++++-----------
 .../planner/plan/batch/table/GroupWindowTest.xml   |  30 +-
 .../logical/AggregateReduceGroupingRuleTest.xml    |  10 +-
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml |  16 +-
 .../stream/sql/RelTimeIndicatorConverterTest.xml   |  12 +-
 .../planner/plan/stream/sql/TableSourceTest.xml    |   2 +-
 .../table/planner/plan/stream/sql/UnnestTest.xml   |   2 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    |  88 +++---
 .../plan/stream/sql/join/WindowJoinTest.xml        |   4 +-
 .../planner/plan/stream/table/AggregateTest.xml    |   4 +-
 .../table/planner/plan/stream/table/CalcTest.xml   |   8 +-
 .../planner/plan/stream/table/GroupWindowTest.xml  |  48 ++--
 .../planner/plan/stream/table/TableSourceTest.xml  |   4 +-
 18 files changed, 285 insertions(+), 283 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
index 56c26a0..1f2c04a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
@@ -45,6 +45,8 @@ case class TumblingGroupWindow(
   extends LogicalWindow(
     alias,
     timeField) {
+
+  override def toString: String = s"TumblingGroupWindow($alias, $timeField, $size)"
 }
 
 // ------------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index f50b17e..62ac33d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -670,7 +670,7 @@ Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc]
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
 
 Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
    +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
       +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
          +- Reused(reference_id=[1])
@@ -717,7 +717,7 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 									ship_strategy : HASH
 
 									 : Operator
@@ -817,7 +817,7 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime])
 
 Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
       +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
          +- Reused(reference_id=[1])
@@ -864,7 +864,7 @@ Sink(name=[appendSink2], fields=[a, b])
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 									ship_strategy : HASH
 
 									 : Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
index acd9bb6..b13623b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
@@ -596,9 +596,9 @@ LogicalSink(name=[sink2], fields=[a, sum_c, time])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a, Final_SUM(sum$0) AS sum_c], reuse_id=[1])
+HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 15000)], properties=[w$start, w$end, w$rowtime], select=[a, Final_SUM(sum$0) AS sum_c], reuse_id=[1])
 +- Exchange(distribution=[hash[a]])
-   +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a, Partial_SUM(c) AS sum$0])
+   +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 15000)], properties=[w$start, w$end, w$rowtime], select=[a, Partial_SUM(c) AS sum$0])
       +- Calc(select=[ts, a, CAST(c) AS c])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, ts)]]], fields=[a, b, c, ts])
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index a263d16..0074eb9 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -68,10 +68,10 @@ LogicalProject(b=[$0], s=[$2])
       <![CDATA[
 Calc(select=[b, f0 AS s])
 +- Correlate(invocation=[explode($cor0.set)], correlate=[table(explode($cor0.set))], select=[b,set,f0], rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)], joinType=[INNER])
-   +- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, Final_COLLECT(set) AS set])
+   +- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 3000)], select=[b, Final_COLLECT(set) AS set])
       +- Sort(orderBy=[b ASC, assignedWindow$ ASC])
          +- Exchange(distribution=[hash[b]])
-            +- LocalSortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, Partial_COLLECT(b) AS set])
+            +- LocalSortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 3000)], select=[b, Partial_COLLECT(b) AS set])
                +- Sort(orderBy=[b ASC, rowtime ASC])
                   +- Calc(select=[b, rowtime], where=[<(b, 3)])
                      +- BoundedStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index 7139daf..c8dc930 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -365,7 +365,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], window=[TumblingGroupWindow], select=[a4, b4 AS EXPR$2, COUNT(c4) AS EXPR$2])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4 AS EXPR$2, COUNT(c4) AS EXPR$2])
 +- Exchange(distribution=[hash[a4]])
    +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -385,7 +385,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], select=[a4, c4 AS EXPR$2, COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS EXPR$2, COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
 +- Exchange(distribution=[hash[a4]])
    +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -411,7 +411,7 @@ Calc(select=[a4, c4, s, EXPR$3])
    +- Exchange(distribution=[hash[a4, s]])
       +- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s, c4, Partial_COUNT(b4) AS count$0])
          +- Calc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
-            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
                +- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
                   +- Exchange(distribution=[hash[a4]])
                      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
@@ -438,7 +438,7 @@ Calc(select=[a4, c4, e, EXPR$3])
    +- Exchange(distribution=[hash[a4, e]])
       +- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e, c4, Partial_COUNT(b4) AS count$0])
          +- Calc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
-            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
                +- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
                   +- Exchange(distribution=[hash[a4]])
                      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
@@ -464,7 +464,7 @@ HashAggregate(isMerge=[true], groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4
 +- Exchange(distribution=[hash[a4, b4]])
    +- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4, c4, Partial_COUNT(*) AS count1$0])
       +- Calc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4, c4])
-         +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+         +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
             +- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
                +- Exchange(distribution=[hash[a4]])
                   +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
index d6d70c3..b91d909 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
@@ -40,9 +40,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
    +- Exchange(distribution=[single])
-      +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+      +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
          +- Calc(select=[ts, b, *(b, b) AS $f2])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
 ]]>
@@ -72,7 +72,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[SUM($f2) AS $f0, SUM(b) AS $f1, COUNT(b) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[SUM($f2) AS $f0, SUM(b) AS $f1, COUNT(b) AS $f2])
    +- Exchange(distribution=[single])
       +- Calc(select=[ts, b, *(b, b) AS $f2])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -103,9 +103,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
    +- Exchange(distribution=[single])
-      +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+      +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
          +- Calc(select=[ts, b, *(b, b) AS $f2])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
 ]]>
@@ -329,9 +329,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
 +- Exchange(distribution=[single])
-   +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+   +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
       +- Calc(select=[b, c, a])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
@@ -351,7 +351,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[AVG(c) AS EXPR$0, SUM(a) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[AVG(c) AS EXPR$0, SUM(a) AS EXPR$1])
 +- Exchange(distribution=[single])
    +- Calc(select=[b, c, a])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -372,9 +372,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
 +- Exchange(distribution=[single])
-   +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+   +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
       +- Calc(select=[b, c, a])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
@@ -438,9 +438,9 @@ LogicalProject(sumA=[$1], cntB=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
 +- Exchange(distribution=[single])
-   +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+   +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
       +- Calc(select=[ts, a, b])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
 ]]>
@@ -460,7 +460,7 @@ LogicalProject(sumA=[$1], cntB=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[SUM(a) AS sumA, COUNT(b) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[SUM(a) AS sumA, COUNT(b) AS cntB])
 +- Exchange(distribution=[single])
    +- Calc(select=[ts, a, b])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -481,9 +481,9 @@ LogicalProject(sumA=[$1], cntB=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
 +- Exchange(distribution=[single])
-   +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+   +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
       +- Calc(select=[ts, a, b])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
 ]]>
@@ -601,9 +601,9 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA, minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
    +- Exchange(distribution=[hash[c]])
-      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
          +- Calc(select=[ts, c, a, b])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
 ]]>
@@ -633,7 +633,7 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA, minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, SUM(a) AS sumA, MIN(b) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, SUM(a) AS sumA, MIN(b) AS minB])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[ts, c, a, b])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -664,14 +664,121 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA, minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
    +- Exchange(distribution=[hash[c]])
-      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
          +- Calc(select=[ts, c, a, b])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  SUM(correct) AS s,
+  AVG(correct) AS a,
+  TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+  SELECT CASE a
+      WHEN 1 THEN 1
+      ELSE 99
+    END AS correct, b
+  FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
+   +- Exchange(distribution=[single])
+      +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
+         +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  SUM(correct) AS s,
+  AVG(correct) AS a,
+  TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+  SELECT CASE a
+      WHEN 1 THEN 1
+      ELSE 99
+    END AS correct, b
+  FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  SUM(correct) AS s,
+  AVG(correct) AS a,
+  TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+  SELECT CASE a
+      WHEN 1 THEN 1
+      ELSE 99
+    END AS correct, b
+  FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
+   +- Exchange(distribution=[single])
+      +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
+         +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSlidingWindowHashAgg[aggStrategy=AUTO]">
     <Resource name="sql">
       <![CDATA[SELECT count(c) FROM MyTable1 GROUP BY b, HOP(ts, INTERVAL '3' SECOND, INTERVAL '1' HOUR)]]>
@@ -1012,9 +1119,9 @@ LogicalProject(EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_COUNT(count$0) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_COUNT(count$0) AS EXPR$0])
    +- Exchange(distribution=[hash[a]])
-      +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_COUNT(c) AS count$0])
+      +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_COUNT(c) AS count$0])
          +- Calc(select=[a, ts, c])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
 ]]>
@@ -1035,7 +1142,7 @@ LogicalProject(EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, COUNT(c) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, COUNT(c) AS EXPR$0])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, ts, c])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -1057,9 +1164,9 @@ LogicalProject(EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_COUNT(count$0) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_COUNT(count$0) AS EXPR$0])
    +- Exchange(distribution=[hash[a]])
-      +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_COUNT(c) AS count$0])
+      +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_COUNT(c) AS count$0])
          +- Calc(select=[a, ts, c])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
 ]]>
@@ -1080,9 +1187,9 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
    +- Exchange(distribution=[hash[a, d]])
-      +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
+      +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
     </Resource>
@@ -1102,7 +1209,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, AVG(c) AS EXPR$0, COUNT(a) AS EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, AVG(c) AS EXPR$0, COUNT(a) AS EXPR$1])
    +- Exchange(distribution=[hash[a, d]])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
@@ -1123,9 +1230,9 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
    +- Exchange(distribution=[hash[a, d]])
-      +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
+      +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
     </Resource>
@@ -1144,10 +1251,10 @@ LogicalProject(wAvg=[$1])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow], select=[Final_weightedAvg(wAvg) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg])
 +- Sort(orderBy=[assignedWindow$ ASC])
    +- Exchange(distribution=[single])
-      +- LocalSortWindowAggregate(window=[TumblingGroupWindow], select=[Partial_weightedAvg(b, a) AS wAvg])
+      +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
          +- Sort(orderBy=[ts ASC])
             +- Calc(select=[ts, b, a])
                +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1169,10 +1276,10 @@ LogicalProject(EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_MAX(max$0) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_MAX(max$0) AS EXPR$0])
    +- Sort(orderBy=[a ASC, assignedWindow$ ASC])
       +- Exchange(distribution=[hash[a]])
-         +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_MAX(c) AS max$0])
+         +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_MAX(c) AS max$0])
             +- Sort(orderBy=[a ASC, ts ASC])
                +- Calc(select=[a, ts, c])
                   +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -1194,7 +1301,7 @@ LogicalProject(EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, MAX(c) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, MAX(c) AS EXPR$0])
    +- Sort(orderBy=[a ASC, ts ASC])
       +- Exchange(distribution=[hash[a]])
          +- Calc(select=[a, ts, c])
@@ -1217,10 +1324,10 @@ LogicalProject(EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_MAX(max$0) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_MAX(max$0) AS EXPR$0])
    +- Sort(orderBy=[a ASC, assignedWindow$ ASC])
       +- Exchange(distribution=[hash[a]])
-         +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_MAX(c) AS max$0])
+         +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_MAX(c) AS max$0])
             +- Sort(orderBy=[a ASC, ts ASC])
                +- Calc(select=[a, ts, c])
                   +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -1242,7 +1349,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
    +- Sort(orderBy=[a ASC, d ASC, b ASC])
       +- Exchange(distribution=[hash[a, d]])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -1264,7 +1371,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
    +- Sort(orderBy=[a ASC, d ASC, b ASC])
       +- Exchange(distribution=[hash[a, d]])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -1286,10 +1393,10 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_countFun(EXPR$1) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_countFun(EXPR$1) AS EXPR$1])
    +- Sort(orderBy=[a ASC, d ASC, assignedWindow$ ASC])
       +- Exchange(distribution=[hash[a, d]])
-         +- LocalSortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_countFun(a) AS EXPR$1])
+         +- LocalSortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_countFun(a) AS EXPR$1])
             +- Sort(orderBy=[a ASC, d ASC, b ASC])
                +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
@@ -1310,9 +1417,9 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
    +- Exchange(distribution=[hash[c]])
-      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
+      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
          +- Calc(select=[ts, c])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
 ]]>
@@ -1332,7 +1439,7 @@ LogicalProject(wAvg=[$1])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow], select=[weightedAvg(b, a) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[weightedAvg(b, a) AS wAvg])
 +- Sort(orderBy=[ts ASC])
    +- Exchange(distribution=[single])
       +- Calc(select=[ts, b, a])
@@ -1355,7 +1462,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[ts, c])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1376,10 +1483,10 @@ LogicalProject(wAvg=[$1])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow], select=[Final_weightedAvg(wAvg) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg])
 +- Sort(orderBy=[assignedWindow$ ASC])
    +- Exchange(distribution=[single])
-      +- LocalSortWindowAggregate(window=[TumblingGroupWindow], select=[Partial_weightedAvg(b, a) AS wAvg])
+      +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
          +- Sort(orderBy=[ts ASC])
             +- Calc(select=[ts, b, a])
                +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1401,119 +1508,12 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
    +- Exchange(distribution=[hash[c]])
-      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
+      +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
          +- Calc(select=[ts, c])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
 ]]>
     </Resource>
   </TestCase>
-    <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=AUTO]">
-        <Resource name="sql">
-            <![CDATA[
-SELECT
-  SUM(correct) AS s,
-  AVG(correct) AS a,
-  TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
-  SELECT CASE a
-      WHEN 1 THEN 1
-      ELSE 99
-    END AS correct, b
-  FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
-      ]]>
-        </Resource>
-        <Resource name="planBefore">
-            <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
-]]>
-        </Resource>
-        <Resource name="planAfter">
-            <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
-   +- Exchange(distribution=[single])
-      +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
-         +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
-        </Resource>
-    </TestCase>
-    <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=ONE_PHASE]">
-        <Resource name="sql">
-            <![CDATA[
-SELECT
-  SUM(correct) AS s,
-  AVG(correct) AS a,
-  TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
-  SELECT CASE a
-      WHEN 1 THEN 1
-      ELSE 99
-    END AS correct, b
-  FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
-      ]]>
-        </Resource>
-        <Resource name="planBefore">
-            <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
-]]>
-        </Resource>
-        <Resource name="planAfter">
-            <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1])
-   +- Exchange(distribution=[single])
-      +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
-        </Resource>
-    </TestCase>
-    <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=TWO_PHASE]">
-        <Resource name="sql">
-            <![CDATA[
-SELECT
-  SUM(correct) AS s,
-  AVG(correct) AS a,
-  TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
-  SELECT CASE a
-      WHEN 1 THEN 1
-      ELSE 99
-    END AS correct, b
-  FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
-      ]]>
-        </Resource>
-        <Resource name="planBefore">
-            <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
-]]>
-        </Resource>
-        <Resource name="planAfter">
-            <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
-   +- Exchange(distribution=[single])
-      +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
-         +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
-        </Resource>
-    </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
index e14de92..1b8adc1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
@@ -20,15 +20,15 @@ limitations under the License.
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, long, 5)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)], select=[Final_COUNT(count$0) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_COUNT(int) AS count$0])
+   +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)], select=[Partial_COUNT(int) AS count$0])
       +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
 ]]>
     </Resource>
@@ -54,15 +54,15 @@ HashWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, long, 8, 10
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, long, 5)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Final_COUNT(count$0) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
-   +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Partial_COUNT(int) AS count$0])
+   +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Partial_COUNT(int) AS count$0])
       +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
 ]]>
     </Resource>
@@ -71,15 +71,15 @@ HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[stri
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3])
    +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
-   +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+   +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
       +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]], fields=[ts, int, string])
 ]]>
     </Resource>
@@ -88,16 +88,16 @@ HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow('w, long, 5)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Final_myWeightedAvg(EXPR$0) AS EXPR$0])
+SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Final_myWeightedAvg(EXPR$0) AS EXPR$0])
 +- Sort(orderBy=[string ASC, assignedWindow$ ASC])
    +- Exchange(distribution=[hash[string]])
-      +- LocalSortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Partial_myWeightedAvg(long, int) AS EXPR$0])
+      +- LocalSortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Partial_myWeightedAvg(long, int) AS EXPR$0])
          +- Sort(orderBy=[string ASC, long ASC])
             +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
 ]]>
@@ -107,15 +107,15 @@ SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[stri
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3])
    +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
-   +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+   +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
       +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]], fields=[ts, int, string])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
index d98cbd7..b017098 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
@@ -330,7 +330,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalWindowAggregate(group=[{0}], b4=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($2)], window=[TumblingGroupWindow], properties=[])
+FlinkLogicalWindowAggregate(group=[{0}], b4=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($2)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[])
 +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
     </Resource>
@@ -349,7 +349,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)], EXPR$2=[COUNT($1)], EXPR$3=[AVG($1)], window=[TumblingGroupWindow], properties=[])
+FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)], EXPR$2=[COUNT($1)], EXPR$3=[AVG($1)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[])
 +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
     </Resource>
@@ -372,7 +372,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
 FlinkLogicalCalc(select=[a4, c4, s, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)], EXPR$3=[COUNT($3)])
    +- FlinkLogicalCalc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
-      +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
+      +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime])
          +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -396,7 +396,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
 FlinkLogicalCalc(select=[a4, c4, e, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)], EXPR$3=[COUNT($3)])
    +- FlinkLogicalCalc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
-      +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
+      +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime])
          +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -419,7 +419,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
       <![CDATA[
 FlinkLogicalAggregate(group=[{0, 1}], c4=[AUXILIARY_GROUP($2)], EXPR$3=[COUNT()])
 +- FlinkLogicalCalc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4, c4])
-   +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[])
+   +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[])
       +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
          +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 1dcbe67..b82a3d5 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -71,7 +71,7 @@ LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_EN
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
-+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[b]])
       +- Calc(select=[b, rowtime, a])
          +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
@@ -140,7 +140,7 @@ Calc(select=[b, w0$o0 AS $1])
          +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(rt, -(rt0, 5000:INTERVAL SECOND)), <=(rt, +(rt0, 10000:INTERVAL SECOND)))], select=[b, a, rt, b0, a0, rt0])
             :- Exchange(distribution=[hash[a]])
             :  +- Calc(select=[b, a, w$rowtime AS rt])
-            :     +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+            :     +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             :        +- Exchange(distribution=[hash[b]])
             :           +- Calc(select=[b, rowtime, a])
             :              +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
@@ -269,7 +269,7 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
       +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
 
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
 +- Exchange(distribution=[hash[id1]])
    +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
       +- Reused(reference_id=[1])
@@ -281,7 +281,7 @@ Sink(name=[appendSink1], fields=[a, b])
          +- Reused(reference_id=[2])
 
 Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
       +- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
          +- Reused(reference_id=[1])
@@ -329,7 +329,7 @@ Sink(name=[appendSink3], fields=[a, b])
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
 									ship_strategy : HASH
 
 									 : Operator
@@ -349,7 +349,7 @@ Sink(name=[appendSink3], fields=[a, b])
 													ship_strategy : FORWARD
 
 													 : Operator
-														content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+														content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 														ship_strategy : HASH
 
 														 : Operator
@@ -520,10 +520,10 @@ LogicalProject(b=[$0], EXPR$1=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, $SUM0(cnt) AS EXPR$1])
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)], select=[b, $SUM0(cnt) AS EXPR$1])
 +- Exchange(distribution=[hash[b]])
    +- Calc(select=[b, w$rowtime AS $f1, cnt])
-      +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+      +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
          +- Exchange(distribution=[hash[b]])
             +- Calc(select=[b, rowtime, a])
                +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
index 381c804..62f238d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
@@ -148,10 +148,10 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$end AS EXPR$0, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, $f0, 30000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- Calc(select=[w$rowtime AS $f0, long, int])
-         +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS int, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+         +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS int, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             +- Exchange(distribution=[hash[long]])
                +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
 ]]>
@@ -236,7 +236,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, long])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], select=[long, MIN(rowtime0) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 100)], select=[long, MIN(rowtime0) AS EXPR$0])
    +- Exchange(distribution=[hash[long]])
       +- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
@@ -286,7 +286,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$end AS EXPR$0, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
 ]]>
@@ -312,7 +312,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, long], where=[=(EXTRACT(FLAG(QUARTER), w$end), 1:BIGINT)])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, MIN(rowtime0) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, MIN(rowtime0) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
@@ -341,7 +341,7 @@ LogicalProject(rowtime=[TUMBLE_END($1)], long=[$0], EXPR$2=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$end AS rowtime, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 100)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index df5adca..3d4f11f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -125,7 +125,7 @@ LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[name, w$end AS EXPR$1, EXPR$2])
-+- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$, rowtime, 600000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[name]])
       +- Calc(select=[name, rowtime, val], where=[>(val, 100)])
          +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index b42e44f..09ef6c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -68,7 +68,7 @@ LogicalProject(b=[$0], s=[$2])
       <![CDATA[
 Calc(select=[b, s])
 +- Correlate(invocation=[explode($cor0.set)], correlate=[table(explode($cor0.set))], select=[b,set,f0], rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)], joinType=[INNER])
-   +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, COLLECT(b) AS set])
+   +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 3000)], select=[b, COLLECT(b) AS set])
       +- Exchange(distribution=[hash[b]])
          +- Calc(select=[b, rowtime], where=[<(b, 3)])
             +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index c88f676..45f4725 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -41,7 +41,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime, c, *(c, c) AS $f2])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -68,7 +68,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[+(TUMBLE_END($0), 60000:INTERVAL MINUTE)])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, +(w$end, 60000:INTERVAL MINUTE) AS EXPR$1])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -160,7 +160,7 @@ LogicalProject(EXPR$0=[$1])
 Calc(select=[EXPR$0])
 +- GroupAggregate(groupBy=[b], select=[b, weightedAvg(c, a) AS EXPR$0])
    +- Exchange(distribution=[hash[b]])
-      +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], select=[a, b, c])
+      +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c])
          +- Exchange(distribution=[hash[a, b, c]])
             +- Calc(select=[a, b, c, rowtime])
                +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -297,10 +297,10 @@ LogicalProject(EXPR$0=[TUMBLE_ROWTIME($0)], EXPR$1=[TUMBLE_END($0)], a=[$1])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[w$rowtime AS EXPR$0, w$end AS EXPR$1, a])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f0, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[w$rowtime AS $f0, a])
-         +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+         +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 2)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             +- Exchange(distribution=[single])
                +- Calc(select=[rowtime, a])
                   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -333,13 +333,48 @@ LogicalProject(EXPR$0=[$2])
 Calc(select=[EXPR$0])
 +- GroupAggregate(groupBy=[b, d], select=[b, d, weightedAvg(c, a) AS EXPR$0])
    +- Exchange(distribution=[hash[b, d]])
-      +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], select=[a, b, c, COUNT(*) AS d])
+      +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c, COUNT(*) AS d])
          +- Exchange(distribution=[hash[a, b, c]])
             +- Calc(select=[a, b, c, rowtime])
                +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testReturnTypeInferenceForWindowAgg">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  SUM(correct) AS s,
+  AVG(correct) AS a,
+  TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
+FROM (
+  SELECT CASE a
+      WHEN 1 THEN 1
+      ELSE 99
+    END AS correct, rowtime
+  FROM MyTable
+)
+GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[rowtime, CASE(=(a, 1), 1, 99) AS $f1])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSessionFunction">
     <Resource name="sql">
       <![CDATA[
@@ -396,7 +431,7 @@ Calc(select=[EXPR$0])
 +- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, weightedAvg(c, a) AS EXPR$0])
    +- Exchange(distribution=[hash[b, d, ping_start]])
       +- Calc(select=[b, d, w$start AS ping_start, c, a])
-         +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+         +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             +- Exchange(distribution=[hash[a, b, c]])
                +- Calc(select=[a, b, c, rowtime])
                   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -431,7 +466,7 @@ Calc(select=[EXPR$0])
 +- GroupAggregate(groupBy=[b, ping_start], select=[b, ping_start, weightedAvg(c, a) AS EXPR$0])
    +- Exchange(distribution=[hash[b, ping_start]])
       +- Calc(select=[b, w$start AS ping_start, c, a])
-         +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+         +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             +- Exchange(distribution=[hash[a, b, c]])
                +- Calc(select=[a, b, c, rowtime])
                   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -460,46 +495,11 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2], EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime, c, a])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
-    <TestCase name="testReturnTypeInferenceForWindowAgg">
-        <Resource name="sql">
-            <![CDATA[
-SELECT
-  SUM(correct) AS s,
-  AVG(correct) AS a,
-  TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
-FROM (
-  SELECT CASE a
-      WHEN 1 THEN 1
-      ELSE 99
-    END AS correct, rowtime
-  FROM MyTable
-)
-GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
-      ]]>
-        </Resource>
-        <Resource name="planBefore">
-            <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-        </Resource>
-        <Resource name="planAfter">
-            <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-   +- Exchange(distribution=[single])
-      +- Calc(select=[rowtime, CASE(=(a, 1), 1, 99) AS $f1])
-         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
-]]>
-        </Resource>
-    </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index 5053e8d..ba5ea52 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -217,7 +217,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt])
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 21600000)], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt])
 +- Exchange(distribution=[hash[b]])
    +- Calc(select=[rowtime, b, a0, b0])
       +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(CAST(rowtime), -(CAST(rowtime0), 600000:INTERVAL MINUTE)), <=(CAST(rowtime), +(CAST(rowtime0), 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
@@ -403,7 +403,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt])
+GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow('w$, rowtime0, 21600000)], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt])
 +- Exchange(distribution=[hash[b0]])
    +- Calc(select=[rowtime0, b0, a, b])
       +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(CAST(rowtime), -(CAST(rowtime0), 600000:INTERVAL MINUTE)), <=(CAST(rowtime), +(CAST(rowtime0), 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
index d5b8a85..95b1683 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
@@ -53,13 +53,13 @@ GroupAggregate(groupBy=[b], select=[b, COUNT(a) AS TMP_0])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow('w, rowtime, 900000)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(DISTINCT a) AS EXPR$0, SUM(a) AS EXPR$1])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 900000)], select=[COUNT(DISTINCT a) AS EXPR$0, SUM(a) AS EXPR$1])
 +- Exchange(distribution=[single])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
index 93d56b8..2a44ce2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
@@ -151,7 +151,7 @@ Calc(select=[a AS a2, b AS b2])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2], b=[$0])
-+- LogicalWindowAggregate(group=[{1}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{1}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], $f5=[UPPER($2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
@@ -159,7 +159,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], b=[$0])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[EXPR$0, EXPR$1, b])
-+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w, rowtime, 5)], select=[b, COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
    +- Exchange(distribution=[hash[b]])
       +- Calc(select=[a, b, c, d, rowtime, UPPER(c) AS $f5])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
@@ -170,14 +170,14 @@ Calc(select=[EXPR$0, EXPR$1, b])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], $f5=[UPPER($2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)], select=[COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
 +- Exchange(distribution=[single])
    +- Calc(select=[a, b, c, d, rowtime, UPPER(c) AS $f5])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
index ae8ceea..165c523 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
@@ -68,13 +68,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w, rowtime, 8, 10)], select=[CO
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)], select=[COUNT(int) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, int, string])
 ]]>
@@ -100,13 +100,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w, proctime, 2, 1)], select=[CO
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)], select=[COUNT(int) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, rowtime])
 ]]>
@@ -132,13 +132,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w, proctime, 50, 50)], select=[
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 2)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, proctime, 2)], select=[COUNT(int) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
 ]]>
@@ -148,13 +148,13 @@ GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0]
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 50)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, proctime, 50)], select=[string, COUNT(int) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
 ]]>
@@ -164,14 +164,14 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[str
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[$0], EXPR$1=[$1], EXPR$2=[$2], EXPR$3=[$3], EXPR$4=[$4], EXPR$5=[$5])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[VAR_POP($3)], EXPR$1=[VAR_SAMP($3)], EXPR$2=[STDDEV_POP($3)], EXPR$3=[STDDEV_SAMP($3)], window=[TumblingGroupWindow], properties=[EXPR$4, EXPR$5])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[VAR_POP($3)], EXPR$1=[VAR_SAMP($3)], EXPR$2=[STDDEV_POP($3)], EXPR$3=[STDDEV_SAMP($3)], window=[TumblingGroupWindow('w, rowtime, 900000)], properties=[EXPR$4, EXPR$5])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, EXPR$4, EXPR$5])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[EXPR$4, EXPR$5], select=[SUM($f4) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w) AS EXPR$4, end('w) AS EXPR$5])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 900000)], properties=[EXPR$4, EXPR$5], select=[SUM($f4) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w) AS EXPR$4, end('w) AS EXPR$5])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime, a, b, c, *(c, c) AS $f4])
          +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, a, b, c])
@@ -278,13 +278,13 @@ GroupWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, rowtime, 8
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], select=[string, COUNT(int) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, int, string])
 ]]>
@@ -294,13 +294,13 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[str
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, myWeightedAvg(long, int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], select=[string, myWeightedAvg(long, int) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, rowtime])
 ]]>
@@ -328,7 +328,7 @@ GroupWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, rowtime, 1
 LogicalProject(EXPR$0=[$0])
 +- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[SlidingGroupWindow('w2, proctime, 20, 10)], properties=[])
    +- LogicalProject(proctime=[AS($2, _UTF-16LE'proctime')], string=[$0], EXPR$1=[$1])
-      +- LogicalWindowAggregate(group=[{2}], EXPR$1=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$0])
+      +- LogicalWindowAggregate(group=[{2}], EXPR$1=[COUNT($1)], window=[TumblingGroupWindow('w1, proctime, 50)], properties=[EXPR$0])
          +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
@@ -336,7 +336,7 @@ LogicalProject(EXPR$0=[$0])
       <![CDATA[
 GroupWindowAggregate(window=[SlidingGroupWindow('w2, proctime, 20, 10)], select=[COUNT(string) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$0], select=[string, COUNT(int) AS EXPR$1, proctime('w1) AS EXPR$0])
+   +- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w1, proctime, 50)], properties=[EXPR$0], select=[string, COUNT(int) AS EXPR$1, proctime('w1) AS EXPR$0])
       +- Exchange(distribution=[hash[string]])
          +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
 ]]>
@@ -362,13 +362,13 @@ GroupWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, proctime,
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 2)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, proctime, 2)], select=[string, COUNT(int) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
 ]]>
@@ -378,13 +378,13 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[str
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 50)], properties=[])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, proctime, 50)], select=[string, COUNT(int) AS EXPR$0])
 +- Exchange(distribution=[hash[string]])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
 ]]>
@@ -428,13 +428,13 @@ Calc(select=[EXPR$0])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2], select=[string, COUNT(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2], select=[string, COUNT(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
 +- Exchange(distribution=[hash[string]])
    +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, rowtime])
 ]]>
@@ -444,14 +444,14 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(string=[$0], s1=[AS(+($1, 1), _UTF-16LE's1')], s2=[AS(+($1, 3), _UTF-16LE's2')], x=[AS($2, _UTF-16LE'x')], x2=[AS($2, _UTF-16LE'x2')], x3=[AS($3, _UTF-16LE'x3')], EXPR$2=[$3])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[SUM($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[SUM($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[string, +(EXPR$0, 1) AS s1, +(EXPR$0, 3) AS s2, EXPR$1 AS x, EXPR$1 AS x2, EXPR$2 AS x3, EXPR$2])
-+- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2], select=[string, SUM(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
++- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2], select=[string, SUM(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
    +- Exchange(distribution=[hash[string]])
       +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, int, string])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index fcdf107..e74e59b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -107,7 +107,7 @@ Calc(select=[name, val, id])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow], properties=[EXPR$0])
++- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0])
    +- LogicalFilter(condition=[>($2, 100)])
       +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
 ]]>
@@ -115,7 +115,7 @@ LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[name, EXPR$0, EXPR$1])
-+- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
    +- Exchange(distribution=[hash[name]])
       +- Calc(select=[id, rowtime, val, name], where=[>(val, 100)])
          +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])