You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/12 02:57:11 UTC
[flink] branch master updated: [FLINK-13563][table-planner-blink]
TumblingGroupWindow should implement toString method to explain more info
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0cda582 [FLINK-13563][table-planner-blink] TumblingGroupWindow should implement toString method to explain more info
0cda582 is described below
commit 0cda582487372d3f2717cb4fc7b0f8f818e18d03
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Aug 2 18:36:17 2019 +0800
[FLINK-13563][table-planner-blink] TumblingGroupWindow should implement toString method to explain more info
This closes #9347
---
.../table/planner/plan/logical/groupWindows.scala | 2 +
.../apache/flink/table/api/stream/ExplainTest.xml | 8 +-
.../planner/plan/batch/sql/DagOptimizationTest.xml | 4 +-
.../table/planner/plan/batch/sql/UnnestTest.xml | 4 +-
.../batch/sql/agg/AggregateReduceGroupingTest.xml | 10 +-
.../plan/batch/sql/agg/WindowAggregateTest.xml | 312 ++++++++++-----------
.../planner/plan/batch/table/GroupWindowTest.xml | 30 +-
.../logical/AggregateReduceGroupingRuleTest.xml | 10 +-
.../plan/stream/sql/MiniBatchIntervalInferTest.xml | 16 +-
.../stream/sql/RelTimeIndicatorConverterTest.xml | 12 +-
.../planner/plan/stream/sql/TableSourceTest.xml | 2 +-
.../table/planner/plan/stream/sql/UnnestTest.xml | 2 +-
.../plan/stream/sql/agg/WindowAggregateTest.xml | 88 +++---
.../plan/stream/sql/join/WindowJoinTest.xml | 4 +-
.../planner/plan/stream/table/AggregateTest.xml | 4 +-
.../table/planner/plan/stream/table/CalcTest.xml | 8 +-
.../planner/plan/stream/table/GroupWindowTest.xml | 48 ++--
.../planner/plan/stream/table/TableSourceTest.xml | 4 +-
18 files changed, 285 insertions(+), 283 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
index 56c26a0..1f2c04a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
@@ -45,6 +45,8 @@ case class TumblingGroupWindow(
extends LogicalWindow(
alias,
timeField) {
+
+ override def toString: String = s"TumblingGroupWindow($alias, $timeField, $size)"
}
// ------------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index f50b17e..62ac33d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -670,7 +670,7 @@ Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc]
+- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Reused(reference_id=[1])
@@ -717,7 +717,7 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
ship_strategy : FORWARD
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -817,7 +817,7 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime])
Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
+- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
+- Reused(reference_id=[1])
@@ -864,7 +864,7 @@ Sink(name=[appendSink2], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
index acd9bb6..b13623b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
@@ -596,9 +596,9 @@ LogicalSink(name=[sink2], fields=[a, sum_c, time])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a, Final_SUM(sum$0) AS sum_c], reuse_id=[1])
+HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 15000)], properties=[w$start, w$end, w$rowtime], select=[a, Final_SUM(sum$0) AS sum_c], reuse_id=[1])
+- Exchange(distribution=[hash[a]])
- +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a, Partial_SUM(c) AS sum$0])
+ +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 15000)], properties=[w$start, w$end, w$rowtime], select=[a, Partial_SUM(c) AS sum$0])
+- Calc(select=[ts, a, CAST(c) AS c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, ts)]]], fields=[a, b, c, ts])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index a263d16..0074eb9 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -68,10 +68,10 @@ LogicalProject(b=[$0], s=[$2])
<![CDATA[
Calc(select=[b, f0 AS s])
+- Correlate(invocation=[explode($cor0.set)], correlate=[table(explode($cor0.set))], select=[b,set,f0], rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)], joinType=[INNER])
- +- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, Final_COLLECT(set) AS set])
+ +- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 3000)], select=[b, Final_COLLECT(set) AS set])
+- Sort(orderBy=[b ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[b]])
- +- LocalSortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, Partial_COLLECT(b) AS set])
+ +- LocalSortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 3000)], select=[b, Partial_COLLECT(b) AS set])
+- Sort(orderBy=[b ASC, rowtime ASC])
+- Calc(select=[b, rowtime], where=[<(b, 3)])
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index 7139daf..c8dc930 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -365,7 +365,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], window=[TumblingGroupWindow], select=[a4, b4 AS EXPR$2, COUNT(c4) AS EXPR$2])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4 AS EXPR$2, COUNT(c4) AS EXPR$2])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -385,7 +385,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], select=[a4, c4 AS EXPR$2, COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS EXPR$2, COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -411,7 +411,7 @@ Calc(select=[a4, c4, s, EXPR$3])
+- Exchange(distribution=[hash[a4, s]])
+- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s, c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
@@ -438,7 +438,7 @@ Calc(select=[a4, c4, e, EXPR$3])
+- Exchange(distribution=[hash[a4, e]])
+- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e, c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
@@ -464,7 +464,7 @@ HashAggregate(isMerge=[true], groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4
+- Exchange(distribution=[hash[a4, b4]])
+- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4, c4, Partial_COUNT(*) AS count1$0])
+- Calc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4, c4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
index d6d70c3..b91d909 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
@@ -40,9 +40,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+- Calc(select=[ts, b, *(b, b) AS $f2])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -72,7 +72,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[SUM($f2) AS $f0, SUM(b) AS $f1, COUNT(b) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[SUM($f2) AS $f0, SUM(b) AS $f1, COUNT(b) AS $f2])
+- Exchange(distribution=[single])
+- Calc(select=[ts, b, *(b, b) AS $f2])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -103,9 +103,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+- Calc(select=[ts, b, *(b, b) AS $f2])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -329,9 +329,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+- Calc(select=[b, c, a])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -351,7 +351,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[AVG(c) AS EXPR$0, SUM(a) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[AVG(c) AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[single])
+- Calc(select=[b, c, a])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -372,9 +372,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+- Calc(select=[b, c, a])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -438,9 +438,9 @@ LogicalProject(sumA=[$1], cntB=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+- Calc(select=[ts, a, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -460,7 +460,7 @@ LogicalProject(sumA=[$1], cntB=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[SUM(a) AS sumA, COUNT(b) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[SUM(a) AS sumA, COUNT(b) AS cntB])
+- Exchange(distribution=[single])
+- Calc(select=[ts, a, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -481,9 +481,9 @@ LogicalProject(sumA=[$1], cntB=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+- Calc(select=[ts, a, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -601,9 +601,9 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA, minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+ +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+- Calc(select=[ts, c, a, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -633,7 +633,7 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA, minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, SUM(a) AS sumA, MIN(b) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, SUM(a) AS sumA, MIN(b) AS minB])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[ts, c, a, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -664,14 +664,121 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA, minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA, Final_MIN(min$1) AS minB])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+ +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+- Calc(select=[ts, c, a, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
</Resource>
</TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, b
+ FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
+ +- Exchange(distribution=[single])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
+ +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, b
+ FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, b
+ FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
+ +- Exchange(distribution=[single])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
+ +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSlidingWindowHashAgg[aggStrategy=AUTO]">
<Resource name="sql">
<![CDATA[SELECT count(c) FROM MyTable1 GROUP BY b, HOP(ts, INTERVAL '3' SECOND, INTERVAL '1' HOUR)]]>
@@ -1012,9 +1119,9 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_COUNT(count$0) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
- +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_COUNT(c) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_COUNT(c) AS count$0])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -1035,7 +1142,7 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, COUNT(c) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, COUNT(c) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -1057,9 +1164,9 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_COUNT(count$0) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
- +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_COUNT(c) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_COUNT(c) AS count$0])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -1080,9 +1187,9 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
+- Exchange(distribution=[hash[a, d]])
- +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
+ +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
@@ -1102,7 +1209,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, AVG(c) AS EXPR$0, COUNT(a) AS EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, AVG(c) AS EXPR$0, COUNT(a) AS EXPR$1])
+- Exchange(distribution=[hash[a, d]])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -1123,9 +1230,9 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS EXPR$1])
+- Exchange(distribution=[hash[a, d]])
- +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
+ +- LocalHashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_COUNT(a) AS count$2])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
@@ -1144,10 +1251,10 @@ LogicalProject(wAvg=[$1])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow], select=[Final_weightedAvg(wAvg) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg])
+- Sort(orderBy=[assignedWindow$ ASC])
+- Exchange(distribution=[single])
- +- LocalSortWindowAggregate(window=[TumblingGroupWindow], select=[Partial_weightedAvg(b, a) AS wAvg])
+ +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
+- Sort(orderBy=[ts ASC])
+- Calc(select=[ts, b, a])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1169,10 +1276,10 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_MAX(max$0) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_MAX(max$0) AS EXPR$0])
+- Sort(orderBy=[a ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[a]])
- +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_MAX(c) AS max$0])
+ +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_MAX(c) AS max$0])
+- Sort(orderBy=[a ASC, ts ASC])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -1194,7 +1301,7 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, MAX(c) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, MAX(c) AS EXPR$0])
+- Sort(orderBy=[a ASC, ts ASC])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, ts, c])
@@ -1217,10 +1324,10 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Final_MAX(max$0) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Final_MAX(max$0) AS EXPR$0])
+- Sort(orderBy=[a ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[a]])
- +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a, Partial_MAX(c) AS max$0])
+ +- LocalSortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_MAX(c) AS max$0])
+- Sort(orderBy=[a ASC, ts ASC])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -1242,7 +1349,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, b ASC])
+- Exchange(distribution=[hash[a, d]])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -1264,7 +1371,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, b ASC])
+- Exchange(distribution=[hash[a, d]])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -1286,10 +1393,10 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_countFun(EXPR$1) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_countFun(EXPR$1) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[a, d]])
- +- LocalSortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_countFun(a) AS EXPR$1])
+ +- LocalSortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS (sum$0, count$1), Partial_countFun(a) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, b ASC])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -1310,9 +1417,9 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
+ +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Calc(select=[ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -1332,7 +1439,7 @@ LogicalProject(wAvg=[$1])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow], select=[weightedAvg(b, a) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[weightedAvg(b, a) AS wAvg])
+- Sort(orderBy=[ts ASC])
+- Exchange(distribution=[single])
+- Calc(select=[ts, b, a])
@@ -1355,7 +1462,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1376,10 +1483,10 @@ LogicalProject(wAvg=[$1])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow], select=[Final_weightedAvg(wAvg) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg])
+- Sort(orderBy=[assignedWindow$ ASC])
+- Exchange(distribution=[single])
- +- LocalSortWindowAggregate(window=[TumblingGroupWindow], select=[Partial_weightedAvg(b, a) AS wAvg])
+ +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
+- Sort(orderBy=[ts ASC])
+- Calc(select=[ts, b, a])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1401,119 +1508,12 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[c])
+ +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Calc(select=[ts, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
</Resource>
</TestCase>
- <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=AUTO]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, b
- FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
- +- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
- +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, b
- FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1])
- +- Exchange(distribution=[single])
- +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, b
- FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
- +- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
- +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
- </Resource>
- </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
index e14de92..1b8adc1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
@@ -20,15 +20,15 @@ limitations under the License.
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, long, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)], select=[Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_COUNT(int) AS count$0])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)], select=[Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
]]>
</Resource>
@@ -54,15 +54,15 @@ HashWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, long, 8, 10
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, long, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
- +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Partial_COUNT(int) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
]]>
</Resource>
@@ -71,15 +71,15 @@ HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[stri
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
- +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]], fields=[ts, int, string])
]]>
</Resource>
@@ -88,16 +88,16 @@ HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow('w, long, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Final_myWeightedAvg(EXPR$0) AS EXPR$0])
+SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Final_myWeightedAvg(EXPR$0) AS EXPR$0])
+- Sort(orderBy=[string ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[string]])
- +- LocalSortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, Partial_myWeightedAvg(long, int) AS EXPR$0])
+ +- LocalSortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long, 5)], select=[string, Partial_myWeightedAvg(long, int) AS EXPR$0])
+- Sort(orderBy=[string ASC, long ASC])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
]]>
@@ -107,15 +107,15 @@ SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[stri
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
- +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, int, string)]]], fields=[ts, int, string])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
index d98cbd7..b017098 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
@@ -330,7 +330,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalWindowAggregate(group=[{0}], b4=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($2)], window=[TumblingGroupWindow], properties=[])
+FlinkLogicalWindowAggregate(group=[{0}], b4=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($2)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
</Resource>
@@ -349,7 +349,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)], EXPR$2=[COUNT($1)], EXPR$3=[AVG($1)], window=[TumblingGroupWindow], properties=[])
+FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)], EXPR$2=[COUNT($1)], EXPR$3=[AVG($1)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
</Resource>
@@ -372,7 +372,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
FlinkLogicalCalc(select=[a4, c4, s, EXPR$3])
+- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)], EXPR$3=[COUNT($3)])
+- FlinkLogicalCalc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
- +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
+ +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime])
+- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -396,7 +396,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
FlinkLogicalCalc(select=[a4, c4, e, EXPR$3])
+- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)], EXPR$3=[COUNT($3)])
+- FlinkLogicalCalc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
- +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
+ +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime])
+- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -419,7 +419,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
<![CDATA[
FlinkLogicalAggregate(group=[{0, 1}], c4=[AUXILIARY_GROUP($2)], EXPR$3=[COUNT()])
+- FlinkLogicalCalc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4, c4])
- +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[])
+ +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[])
+- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 1dcbe67..b82a3d5 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -71,7 +71,7 @@ LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_EN
<Resource name="planAfter">
<![CDATA[
Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
-+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
@@ -140,7 +140,7 @@ Calc(select=[b, w0$o0 AS $1])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(rt, -(rt0, 5000:INTERVAL SECOND)), <=(rt, +(rt0, 10000:INTERVAL SECOND)))], select=[b, a, rt, b0, a0, rt0])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[b, a, w$rowtime AS rt])
- : +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ : +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
: +- Exchange(distribution=[hash[b]])
: +- Calc(select=[b, rowtime, a])
: +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
@@ -269,7 +269,7 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
+- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+- Exchange(distribution=[hash[id1]])
+- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
+- Reused(reference_id=[1])
@@ -281,7 +281,7 @@ Sink(name=[appendSink1], fields=[a, b])
+- Reused(reference_id=[2])
Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
+- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
+- Reused(reference_id=[1])
@@ -329,7 +329,7 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
ship_strategy : HASH
: Operator
@@ -349,7 +349,7 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -520,10 +520,10 @@ LogicalProject(b=[$0], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, $SUM0(cnt) AS EXPR$1])
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)], select=[b, $SUM0(cnt) AS EXPR$1])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, w$rowtime AS $f1, cnt])
- +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
+- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
index 381c804..62f238d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
@@ -148,10 +148,10 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, $f0, 30000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[w$rowtime AS $f0, long, int])
- +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS int, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS int, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
]]>
@@ -236,7 +236,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, long])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], select=[long, MIN(rowtime0) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 100)], select=[long, MIN(rowtime0) AS EXPR$0])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
@@ -286,7 +286,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
]]>
@@ -312,7 +312,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, long], where=[=(EXTRACT(FLAG(QUARTER), w$end), 1:BIGINT)])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, MIN(rowtime0) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, MIN(rowtime0) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
@@ -341,7 +341,7 @@ LogicalProject(rowtime=[TUMBLE_END($1)], long=[$0], EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS rowtime, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 100)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[rowtime, long, int])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index df5adca..3d4f11f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -125,7 +125,7 @@ LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[name, w$end AS EXPR$1, EXPR$2])
-+- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$, rowtime, 600000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[name]])
+- Calc(select=[name, rowtime, val], where=[>(val, 100)])
+- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index b42e44f..09ef6c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -68,7 +68,7 @@ LogicalProject(b=[$0], s=[$2])
<![CDATA[
Calc(select=[b, s])
+- Correlate(invocation=[explode($cor0.set)], correlate=[table(explode($cor0.set))], select=[b,set,f0], rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)], joinType=[INNER])
- +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, COLLECT(b) AS set])
+ +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 3000)], select=[b, COLLECT(b) AS set])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime], where=[<(b, 3)])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index c88f676..45f4725 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -41,7 +41,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, c, *(c, c) AS $f2])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -68,7 +68,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[+(TUMBLE_END($0), 60000:INTERVAL MINUTE)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, +(w$end, 60000:INTERVAL MINUTE) AS EXPR$1])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -160,7 +160,7 @@ LogicalProject(EXPR$0=[$1])
Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b], select=[b, weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b]])
- +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], select=[a, b, c])
+ +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -297,10 +297,10 @@ LogicalProject(EXPR$0=[TUMBLE_ROWTIME($0)], EXPR$1=[TUMBLE_END($0)], a=[$1])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$rowtime AS EXPR$0, w$end AS EXPR$1, a])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f0, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[w$rowtime AS $f0, a])
- +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 2)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, a])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -333,13 +333,48 @@ LogicalProject(EXPR$0=[$2])
Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, d], select=[b, d, weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, d]])
- +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], select=[a, b, c, COUNT(*) AS d])
+ +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c, COUNT(*) AS d])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, rowtime
+ FROM MyTable
+)
+GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[rowtime, CASE(=(a, 1), 1, 99) AS $f1])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSessionFunction">
<Resource name="sql">
<![CDATA[
@@ -396,7 +431,7 @@ Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, d, ping_start]])
+- Calc(select=[b, d, w$start AS ping_start, c, a])
- +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -431,7 +466,7 @@ Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, ping_start], select=[b, ping_start, weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, ping_start]])
+- Calc(select=[b, w$start AS ping_start, c, a])
- +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -460,46 +495,11 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2], EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, c, a])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
- <TestCase name="testReturnTypeInferenceForWindowAgg">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, rowtime
- FROM MyTable
-)
-GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
- +- Exchange(distribution=[single])
- +- Calc(select=[rowtime, CASE(=(a, 1), 1, 99) AS $f1])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
-]]>
- </Resource>
- </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index 5053e8d..ba5ea52 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -217,7 +217,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt])
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 21600000)], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[rowtime, b, a0, b0])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(CAST(rowtime), -(CAST(rowtime0), 600000:INTERVAL MINUTE)), <=(CAST(rowtime), +(CAST(rowtime0), 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
@@ -403,7 +403,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt])
+GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow('w$, rowtime0, 21600000)], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt])
+- Exchange(distribution=[hash[b0]])
+- Calc(select=[rowtime0, b0, a, b])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(CAST(rowtime), -(CAST(rowtime0), 600000:INTERVAL MINUTE)), <=(CAST(rowtime), +(CAST(rowtime0), 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
index d5b8a85..95b1683 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
@@ -53,13 +53,13 @@ GroupAggregate(groupBy=[b], select=[b, COUNT(a) AS TMP_0])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow('w, rowtime, 900000)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(DISTINCT a) AS EXPR$0, SUM(a) AS EXPR$1])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 900000)], select=[COUNT(DISTINCT a) AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
index 93d56b8..2a44ce2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
@@ -151,7 +151,7 @@ Calc(select=[a AS a2, b AS b2])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$1], EXPR$1=[$2], b=[$0])
-+- LogicalWindowAggregate(group=[{1}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{1}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], $f5=[UPPER($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
@@ -159,7 +159,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], b=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1, b])
-+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w, rowtime, 5)], select=[b, COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[a, b, c, d, rowtime, UPPER(c) AS $f5])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
@@ -170,14 +170,14 @@ Calc(select=[EXPR$0, EXPR$1, b])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], $f5=[UPPER($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)], select=[COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[single])
+- Calc(select=[a, b, c, d, rowtime, UPPER(c) AS $f5])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
index ae8ceea..165c523 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
@@ -68,13 +68,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w, rowtime, 8, 10)], select=[CO
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)], select=[COUNT(int) AS EXPR$0])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, int, string])
]]>
@@ -100,13 +100,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w, proctime, 2, 1)], select=[CO
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)], select=[COUNT(int) AS EXPR$0])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, rowtime])
]]>
@@ -132,13 +132,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w, proctime, 50, 50)], select=[
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 2)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, proctime, 2)], select=[COUNT(int) AS EXPR$0])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
]]>
@@ -148,13 +148,13 @@ GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS EXPR$0]
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 50)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, proctime, 50)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
]]>
@@ -164,14 +164,14 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[str
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0], EXPR$1=[$1], EXPR$2=[$2], EXPR$3=[$3], EXPR$4=[$4], EXPR$5=[$5])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[VAR_POP($3)], EXPR$1=[VAR_SAMP($3)], EXPR$2=[STDDEV_POP($3)], EXPR$3=[STDDEV_SAMP($3)], window=[TumblingGroupWindow], properties=[EXPR$4, EXPR$5])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[VAR_POP($3)], EXPR$1=[VAR_SAMP($3)], EXPR$2=[STDDEV_POP($3)], EXPR$3=[STDDEV_SAMP($3)], window=[TumblingGroupWindow('w, rowtime, 900000)], properties=[EXPR$4, EXPR$5])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, EXPR$4, EXPR$5])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[EXPR$4, EXPR$5], select=[SUM($f4) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w) AS EXPR$4, end('w) AS EXPR$5])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 900000)], properties=[EXPR$4, EXPR$5], select=[SUM($f4) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w) AS EXPR$4, end('w) AS EXPR$5])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, a, b, c, *(c, c) AS $f4])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, a, b, c])
@@ -278,13 +278,13 @@ GroupWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, rowtime, 8
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, int, string])
]]>
@@ -294,13 +294,13 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[str
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, myWeightedAvg(long, int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], select=[string, myWeightedAvg(long, int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, rowtime])
]]>
@@ -328,7 +328,7 @@ GroupWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, rowtime, 1
LogicalProject(EXPR$0=[$0])
+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], window=[SlidingGroupWindow('w2, proctime, 20, 10)], properties=[])
+- LogicalProject(proctime=[AS($2, _UTF-16LE'proctime')], string=[$0], EXPR$1=[$1])
- +- LogicalWindowAggregate(group=[{2}], EXPR$1=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$0])
+ +- LogicalWindowAggregate(group=[{2}], EXPR$1=[COUNT($1)], window=[TumblingGroupWindow('w1, proctime, 50)], properties=[EXPR$0])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
@@ -336,7 +336,7 @@ LogicalProject(EXPR$0=[$0])
<![CDATA[
GroupWindowAggregate(window=[SlidingGroupWindow('w2, proctime, 20, 10)], select=[COUNT(string) AS EXPR$0])
+- Exchange(distribution=[single])
- +- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$0], select=[string, COUNT(int) AS EXPR$1, proctime('w1) AS EXPR$0])
+ +- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w1, proctime, 50)], properties=[EXPR$0], select=[string, COUNT(int) AS EXPR$1, proctime('w1) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
]]>
@@ -362,13 +362,13 @@ GroupWindowAggregate(groupBy=[string], window=[SlidingGroupWindow('w, proctime,
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 2)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, proctime, 2)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
]]>
@@ -378,13 +378,13 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[str
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, proctime, 50)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, proctime, 50)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, proctime])
]]>
@@ -428,13 +428,13 @@ Calc(select=[EXPR$0])
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2], select=[string, COUNT(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2], select=[string, COUNT(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[long, int, string, rowtime])
]]>
@@ -444,14 +444,14 @@ GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], s1=[AS(+($1, 1), _UTF-16LE's1')], s2=[AS(+($1, 3), _UTF-16LE's2')], x=[AS($2, _UTF-16LE'x')], x2=[AS($2, _UTF-16LE'x2')], x3=[AS($3, _UTF-16LE'x3')], EXPR$2=[$3])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[SUM($1)], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[SUM($1)], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[string, +(EXPR$0, 1) AS s1, +(EXPR$0, 3) AS s2, EXPR$1 AS x, EXPR$1 AS x2, EXPR$2 AS x3, EXPR$2])
-+- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2], select=[string, SUM(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
++- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2], select=[string, SUM(int) AS EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, int, string])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index fcdf107..e74e59b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -107,7 +107,7 @@ Calc(select=[name, val, id])
<Resource name="planBefore">
<![CDATA[
LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow], properties=[EXPR$0])
++- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0])
+- LogicalFilter(condition=[>($2, 100)])
+- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
]]>
@@ -115,7 +115,7 @@ LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
<Resource name="planAfter">
<![CDATA[
Calc(select=[name, EXPR$0, EXPR$1])
-+- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
+- Exchange(distribution=[hash[name]])
+- Calc(select=[id, rowtime, val, name], where=[>(val, 100)])
+- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])