You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/07/20 02:20:33 UTC

[flink] branch release-1.17 updated: [FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new dc8b70c2fcb [FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever
dc8b70c2fcb is described below

commit dc8b70c2fcbb429a27a9cc1e263d9a38c2d7da34
Author: lincoln lee <li...@gmail.com>
AuthorDate: Thu Jul 20 10:20:26 2023 +0800

    [FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever
    
    This closes #23022
---
 .../stream/StreamPhysicalGroupAggregateRule.scala  |   6 +-
 .../stream/StreamPhysicalWindowAggregateRule.scala |   5 +-
 .../table/planner/plan/utils/WindowUtil.scala      |  51 ++-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 502 +++++++++++++++++++++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  | 240 ++++++++++
 .../runtime/stream/sql/WindowAggregateITCase.scala |  47 +-
 6 files changed, 840 insertions(+), 11 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
index a7f71c2e515..94d02cd5447 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
 import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupAggregate
@@ -50,10 +49,7 @@ class StreamPhysicalGroupAggregateRule(config: Config) extends ConverterRule(con
     }
 
     // check not window aggregate
-    val fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery)
-    val windowProperties = fmq.getRelWindowProperties(agg.getInput)
-    val grouping = agg.getGroupSet
-    !WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)
+    !WindowUtil.isValidWindowAggregate(agg)
   }
 
   override def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
index 4778c4c9c0a..f59286c84ed 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
@@ -55,10 +55,7 @@ class StreamPhysicalWindowAggregateRule(config: Config) extends ConverterRule(co
       return false
     }
 
-    val fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery)
-    val windowProperties = fmq.getRelWindowProperties(agg.getInput)
-    val grouping = agg.getGroupSet
-    WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)
+    WindowUtil.isValidWindowAggregate(agg)
   }
 
   override def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index fdd7a81c191..98d52357a25 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlW
 import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
 import org.apache.flink.table.planner.plan.logical._
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalJoin, FlinkLogicalRank, FlinkLogicalTableFunctionScan}
 import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
 import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
 import org.apache.flink.table.planner.typeutils.RowTypeUtils
@@ -32,16 +33,19 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat
 import org.apache.flink.table.types.logical.TimestampType
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType
 
+import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, SingleRel}
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall, Calc}
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeFamily
 import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.util.ImmutableBitSet
+import org.apache.calcite.util.{ImmutableBitSet, Util}
 
 import java.time.Duration
 import java.util.Collections
 
+import scala.annotation.tailrec
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -310,6 +314,21 @@ object WindowUtil {
       groupingTypes ++ accTypes.map(fromDataTypeToLogicalType) ++ sliceEndType)
   }
 
+  /**
+   * For rowtime window, return true if the given aggregate grouping contains window start and end.
+   * For proctime window, we should also check if it exists a neighbour windowTableFunctionCall.
+   */
+  def isValidWindowAggregate(agg: FlinkLogicalAggregate): Boolean = {
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(agg.getCluster.getMetadataQuery)
+    val windowProperties = fmq.getRelWindowProperties(agg.getInput)
+    val grouping = agg.getGroupSet
+    if (WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)) {
+      windowProperties.isRowtime || existNeighbourWindowTableFunc(agg.getInput)
+    } else {
+      false
+    }
+  }
+
   // ------------------------------------------------------------------------------------------
   // Private Helpers
   // ------------------------------------------------------------------------------------------
@@ -343,4 +362,34 @@ object WindowUtil {
     }
   }
 
+  private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+
+    @tailrec
+    def find(rel: RelNode): Unit = {
+      rel match {
+        case rss: RelSubset =>
+          val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+          find(innerRel)
+
+        case scan: FlinkLogicalTableFunctionScan =>
+          if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+            throw new Util.FoundOne
+          }
+          find(scan.getInput(0))
+
+        // proctime attribute comes from these operators can not be used directly for proctime
+        // window aggregate, so further traversal of child nodes is unnecessary
+        case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _: FlinkLogicalJoin =>
+
+        case sr: SingleRel => find(sr.getInput)
+      }
+    }
+
+    try {
+      find(rel)
+    } catch {
+      case _: Util.FoundOne => return true
+    }
+    false
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 7c7de7a16b5..f1341241787 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -1565,6 +1565,181 @@ Calc(select=[a, b, uv])
          +- Expand(projects=[{a, b, c, 0 AS $e, rowtime}, {a, null AS b, c, 4 AS $e, rowtime}, {null AS a, null AS b, c, 12 AS $e, rowtime}])
             +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
                +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInvalidRelaxFormCascadeProctimeWindow[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  ws,
+  we,
+  COUNT(*)
+FROM proctime_win
+GROUP BY a, ws, we
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[$1], we=[$2])
+   +- LogicalAggregate(group=[{0, 1, 2, 3, 4}], cnt=[COUNT()], sum_d=[SUM($5)], max_d=[MAX($5)])
+      +- LogicalProject(a=[$0], ws=[$7], we=[$8], wt=[$9], b=[$1], d=[$3])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+   +- Calc(select=[a, window_start AS ws, window_end AS we])
+      +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
+         +- Exchange(distribution=[hash[a, b]])
+            +- Calc(select=[a, b, d, proctime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, d, PROCTIME() AS proctime, rowtime])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInvalidRelaxFormCascadeProctimeWindow_OnWindowDedup[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  ws,
+  we,
+  COUNT(*)
+FROM proctime_windedup
+GROUP BY a, ws, we
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[$2], we=[$3])
+   +- LogicalFilter(condition=[<=($6, 1)])
+      +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+   +- Calc(select=[a, window_start AS ws, window_end AS we])
+      +- WindowDeduplicate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], keep=[LastRow], partitionKeys=[], orderKey=[proctime], order=[PROCTIME])
+         +- Exchange(distribution=[single])
+            +- Calc(select=[a, proctime, window_start, window_end])
+               +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+                  +- Calc(select=[a, b, proctime])
+                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInvalidRelaxFormCascadeProctimeWindow_OnWindowJoin[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  ws,
+  we,
+  COUNT(*)
+FROM win_join
+GROUP BY a, ws, we
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[COALESCE($2, $6)], we=[COALESCE($3, $7)])
+   +- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))], joinType=[inner])
+      :- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+      :  +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+      :     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+      :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+      :           +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+      :              +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+   +- Calc(select=[a, window_start AS ws, window_end AS we])
+      +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], joinType=[InnerJoin], where=[true], select=[a, window_start, window_end, window_start0, window_end0])
+         :- Exchange(distribution=[single])
+         :  +- Calc(select=[a, window_start, window_end])
+         :     +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+         :        +- Calc(select=[a, proctime])
+         :           +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+         :              +- Calc(select=[a, PROCTIME() AS proctime, rowtime])
+         :                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, rowtime], metadata=[]]], fields=[a, rowtime])
+         +- Exchange(distribution=[single])
+            +- Calc(select=[window_start, window_end])
+               +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+                  +- Calc(select=[proctime])
+                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[PROCTIME() AS proctime, rowtime])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInvalidRelaxFormCascadeProctimeWindow_OnWindowRank[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  ws,
+  we,
+  COUNT(*)
+FROM proctime_winrank
+GROUP BY a, ws, we
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[$2], we=[$3])
+   +- LogicalFilter(condition=[<=($6, 10)])
+      +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+   +- Calc(select=[a, window_start AS ws, window_end AS we])
+      +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[proctime DESC], select=[a, proctime, window_start, window_end])
+         +- Exchange(distribution=[single])
+            +- Calc(select=[a, proctime, window_start, window_end])
+               +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+                  +- Calc(select=[a, b, proctime])
+                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -1734,6 +1909,151 @@ Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
          +- Calc(select=[a, d, IS NOT NULL(b) AS $f4, b, e, c, rowtime], where=[>(b, 1000)])
             +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
                +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumble_CascadeProctimeWindow_OnWindowDedup[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  window_start,
+  window_end,
+  COUNT(*)
+FROM TABLE(TUMBLE(TABLE proctime_windedup, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8])
+   +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT rn, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], wt=[$4], new_proctime=[$5], rn=[$6])
+         +- LogicalFilter(condition=[<=($6, 1)])
+            +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+               +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+                     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                        +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                           +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[$6], size=[5 min])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, $6])
+         +- WindowDeduplicate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], keep=[LastRow], partitionKeys=[], orderKey=[proctime], order=[PROCTIME])
+            +- Exchange(distribution=[single])
+               +- Calc(select=[a, proctime, window_start, window_end, PROCTIME() AS $6])
+                  +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+                     +- Calc(select=[a, b, proctime])
+                        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                           +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumble_CascadeProctimeWindow_OnWindowJoin[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  window_start,
+  window_end,
+  COUNT(*)
+FROM TABLE(TUMBLE(TABLE win_join, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$5], window_end=[$6])
+   +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], ws=[COALESCE($2, $6)], we=[COALESCE($3, $7)], new_proctime=[PROCTIME()])
+         +- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))], joinType=[inner])
+            :- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+            :  +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+            :     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+            :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+            :           +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+            :              +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+            +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+               +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+                     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                        +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                           +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[new_proctime], size=[5 min])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, PROCTIME() AS new_proctime])
+         +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], joinType=[InnerJoin], where=[true], select=[a, window_start, window_end, window_start0, window_end0])
+            :- Exchange(distribution=[single])
+            :  +- Calc(select=[a, window_start, window_end])
+            :     +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+            :        +- Calc(select=[a, proctime])
+            :           +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+            :              +- Calc(select=[a, PROCTIME() AS proctime, rowtime])
+            :                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, rowtime], metadata=[]]], fields=[a, rowtime])
+            +- Exchange(distribution=[single])
+               +- Calc(select=[window_start, window_end])
+                  +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+                     +- Calc(select=[proctime])
+                        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                           +- Calc(select=[PROCTIME() AS proctime, rowtime])
+                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumble_CascadeProctimeWindow_OnWindowRank[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  window_start,
+  window_end,
+  COUNT(*)
+FROM TABLE(TUMBLE(TABLE proctime_winrank, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8])
+   +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT rn, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], wt=[$4], new_proctime=[$5], rn=[$6])
+         +- LogicalFilter(condition=[<=($6, 10)])
+            +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+               +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+                     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                        +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                           +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[$6], size=[5 min])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, $6])
+         +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[proctime DESC], select=[a, proctime, window_start, window_end, $6])
+            +- Exchange(distribution=[single])
+               +- Calc(select=[a, proctime, window_start, window_end, PROCTIME() AS $6])
+                  +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+                     +- Calc(select=[a, b, proctime])
+                        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                           +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -1820,6 +2140,188 @@ Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5])
                   +- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[5 min])], select=[a, b, COUNT(*) AS count1$0, SUM(d) AS sum$1, MAX(d) AS max$2, slice_end('w$) AS $slice_end])
                      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
                         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumble_CascadingWindow_OnIndividualProctime[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  window_start,
+  window_end,
+  sum(cnt),
+  count(*)
+FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(new_proctime), INTERVAL '10' MINUTE))
+GROUP BY a, window_start, window_end
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[SUM($3)], EXPR$3=[COUNT()])
+   +- LogicalProject(a=[$0], window_start=[$9], window_end=[$10], cnt=[$6])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($8, DESCRIPTOR($5), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT cnt, DECIMAL(38, 3) sum_d, DECIMAL(10, 3) max_d, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+         +- LogicalProject(a=[$0], b=[$4], ws=[$1], we=[$2], wt=[$3], new_proctime=[PROCTIME()], cnt=[$5], sum_d=[$6], max_d=[$7])
+            +- LogicalAggregate(group=[{0, 1, 2, 3, 4}], cnt=[COUNT()], sum_d=[SUM($5)], max_d=[MAX($5)])
+               +- LogicalProject(a=[$0], ws=[$7], we=[$8], wt=[$9], b=[$1], d=[$3])
+                  +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+                        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                           +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                              +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[new_proctime], size=[10 min])], select=[a, $SUM0(cnt) AS EXPR$2, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, cnt, PROCTIME() AS new_proctime])
+         +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS cnt, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
+            +- Exchange(distribution=[hash[a, b]])
+               +- Calc(select=[a, b, d, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, d, PROCTIME() AS proctime, rowtime])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumble_CascadingWindow_OnInheritProctime[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  window_start,
+  window_end,
+  sum(cnt),
+  count(*)
+FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(wt), INTERVAL '10' MINUTE))
+GROUP BY a, window_start, window_end
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[SUM($3)], EXPR$3=[COUNT()])
+   +- LogicalProject(a=[$0], window_start=[$9], window_end=[$10], cnt=[$6])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($8, DESCRIPTOR($4), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT cnt, DECIMAL(38, 3) sum_d, DECIMAL(10, 3) max_d, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+         +- LogicalProject(a=[$0], b=[$4], ws=[$1], we=[$2], wt=[$3], new_proctime=[PROCTIME()], cnt=[$5], sum_d=[$6], max_d=[$7])
+            +- LogicalAggregate(group=[{0, 1, 2, 3, 4}], cnt=[COUNT()], sum_d=[SUM($5)], max_d=[MAX($5)])
+               +- LogicalProject(a=[$0], ws=[$7], we=[$8], wt=[$9], b=[$1], d=[$3])
+                  +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+                        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                           +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                              +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[wt], size=[10 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, cnt, window_time AS wt])
+         +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS cnt, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
+            +- Exchange(distribution=[hash[a, b]])
+               +- Calc(select=[a, b, d, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, d, PROCTIME() AS proctime, rowtime])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumble_CascadingWindow_RelaxForm[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  window_start,
+  window_end,
+  COUNT(*)
+  FROM
+  (
+    SELECT
+    a,
+    window_start,
+    window_end,
+    COUNT(DISTINCT c) AS cnt
+    FROM TABLE(
+      TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR))
+    GROUP BY a, b, window_start, window_end
+) GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$2], window_end=[$3])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT(DISTINCT $4)])
+      +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 86400000:INTERVAL DAY, 28800000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end])
+         +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+            +- Exchange(distribution=[hash[a, b]])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumble_CascadingWindow_RelaxForm[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  window_start,
+  window_end,
+  COUNT(*)
+  FROM
+  (
+    SELECT
+    a,
+    window_start,
+    window_end,
+    COUNT(DISTINCT c) AS cnt
+    FROM TABLE(
+      TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR))
+    GROUP BY a, b, window_start, window_end
+) GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$2], window_end=[$3])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT(DISTINCT $4)])
+      +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 86400000:INTERVAL DAY, 28800000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end])
+         +- Calc(select=[a, window_start, window_end])
+            +- GlobalWindowAggregate(groupBy=[a, b], window=[TUMBLE(slice_end=[$slice_end], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+               +- Exchange(distribution=[hash[a, b]])
+                  +- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, slice_end('w$) AS $slice_end])
+                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index efbb5756a49..cbf67d69066 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -65,6 +65,23 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
                                 |)
                                 |""".stripMargin)
 
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW proctime_win AS
+        |SELECT
+        |   a,
+        |   b,
+        |   window_start as ws,
+        |   window_end as we,
+        |   window_time as wt,
+        |   proctime() as new_proctime,
+        |   count(*) as cnt,
+        |   sum(d) as sum_d,
+        |   max(d) as max_d
+        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
+        |GROUP BY a, window_start, window_end, window_time, b
+      """.stripMargin)
+
     // set agg-phase strategy
     util.tableEnv.getConfig
       .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggPhaseEnforcer.toString)
@@ -278,6 +295,30 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
     util.verifyRelPlan(sql)
   }
 
+  @Test
+  def testTumble_CascadingWindow_RelaxForm(): Unit = {
+    // a relax form of cascaded rowtime window which is actually supported
+    util.verifyRelPlan(
+      """
+        |SELECT
+        |  a,
+        |  window_start,
+        |  window_end,
+        |  COUNT(*)
+        |  FROM
+        |  (
+        |    SELECT
+        |    a,
+        |    window_start,
+        |    window_end,
+        |    COUNT(DISTINCT c) AS cnt
+        |    FROM TABLE(
+        |      TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR))
+        |    GROUP BY a, b, window_start, window_end
+        |) GROUP BY a, window_start, window_end
+      """.stripMargin)
+  }
+
   @Test
   def testTumble_DistinctSplitEnabled(): Unit = {
     util.tableEnv.getConfig
@@ -1009,6 +1050,205 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
       """.stripMargin
     util.verifyRelPlan(sql)
   }
+
+  @Test
+  def testTumble_CascadingWindow_OnIndividualProctime(): Unit = {
+    assumeTrue(!isTwoPhase)
+    // a standard cascaded proctime window
+    util.verifyExecPlan(
+      """
+        |SELECT
+        |  window_start,
+        |  window_end,
+        |  sum(cnt),
+        |  count(*)
+        |FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(new_proctime), INTERVAL '10' MINUTE))
+        |GROUP BY a, window_start, window_end
+        |""".stripMargin)
+  }
+
+  @Test
+  def testTumble_CascadingWindow_OnInheritProctime(): Unit = {
+    assumeTrue(!isTwoPhase)
+    // a standard cascaded proctime window
+    util.verifyExecPlan(
+      """
+        |SELECT
+        |  window_start,
+        |  window_end,
+        |  sum(cnt),
+        |  count(*)
+        |FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(wt), INTERVAL '10' MINUTE))
+        |GROUP BY a, window_start, window_end
+        |""".stripMargin)
+  }
+
+  @Test
+  def testInvalidRelaxFormCascadeProctimeWindow(): Unit = {
+    assumeTrue(!isTwoPhase)
+    // a relax form of cascaded proctime window unsupported for now, will be translated to group agg
+    util.verifyRelPlan("""
+                         |SELECT
+                         |  a,
+                         |  ws,
+                         |  we,
+                         |  COUNT(*)
+                         |FROM proctime_win
+                         |GROUP BY a, ws, we
+      """.stripMargin)
+  }
+
+  @Test
+  def testTumble_CascadeProctimeWindow_OnWindowRank(): Unit = {
+    assumeTrue(!isTwoPhase)
+    // create window top10
+    createProctimeWindowTopN("proctime_winrank", 10)
+
+    util.verifyRelPlan(
+      """
+        |SELECT
+        |  a,
+        |  window_start,
+        |  window_end,
+        |  COUNT(*)
+        |FROM TABLE(TUMBLE(TABLE proctime_winrank, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+        |GROUP BY a, window_start, window_end
+      """.stripMargin)
+  }
+
+  private def createProctimeWindowTopN(viewName: String, topNum: Int): Unit = {
+    util.tableEnv.executeSql(
+      s"""
+         |CREATE VIEW $viewName AS
+         |SELECT *
+         |FROM(
+         | SELECT
+         |    a,
+         |    b,
+         |    window_start as ws,
+         |    window_end as we,
+         |    window_time as wt,
+         |    proctime() as new_proctime,
+         |    ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY proctime DESC) AS rn
+         | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
+         |) WHERE rn <= $topNum
+     """.stripMargin)
+  }
+
+  @Test
+  def testInvalidRelaxFormCascadeProctimeWindow_OnWindowRank(): Unit = {
+    assumeTrue(!isTwoPhase)
+    // create window top10
+    createProctimeWindowTopN("proctime_winrank", 10)
+
+    // a relax form of cascaded proctime window on a window rank is unsupported for now, will be translated to group agg
+    util.verifyRelPlan("""
+                         |SELECT
+                         |  a,
+                         |  ws,
+                         |  we,
+                         |  COUNT(*)
+                         |FROM proctime_winrank
+                         |GROUP BY a, ws, we
+      """.stripMargin)
+  }
+
+  @Test
+  def testTumble_CascadeProctimeWindow_OnWindowDedup(): Unit = {
+    assumeTrue(!isTwoPhase)
+    // create window dedup(top1)
+    createProctimeWindowTopN("proctime_windedup", 1)
+
+    // a relax form of cascaded proctime window on a window dedup is unsupported for now, will be translated to group agg
+    util.verifyRelPlan(
+      """
+        |SELECT
+        |  a,
+        |  window_start,
+        |  window_end,
+        |  COUNT(*)
+        |FROM TABLE(TUMBLE(TABLE proctime_windedup, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+        |GROUP BY a, window_start, window_end
+      """.stripMargin)
+  }
+
+  @Test
+  def testInvalidRelaxFormCascadeProctimeWindow_OnWindowDedup(): Unit = {
+    assumeTrue(!isTwoPhase)
+    // create window dedup(top1)
+    createProctimeWindowTopN("proctime_windedup", 1)
+
+    // a relax form of cascaded proctime window unsupported for now, will be translated to group agg
+    util.verifyRelPlan("""
+                         |SELECT
+                         |  a,
+                         |  ws,
+                         |  we,
+                         |  COUNT(*)
+                         |FROM proctime_windedup
+                         |GROUP BY a, ws, we
+      """.stripMargin)
+  }
+
+  @Test
+  def testTumble_CascadeProctimeWindow_OnWindowJoin(): Unit = {
+    assumeTrue(!isTwoPhase)
+    createWindowJoin
+
+    util.verifyRelPlan(
+      """
+        |SELECT
+        |  a,
+        |  window_start,
+        |  window_end,
+        |  COUNT(*)
+        |FROM TABLE(TUMBLE(TABLE win_join, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+        |GROUP BY a, window_start, window_end
+      """.stripMargin)
+  }
+
+  private def createWindowJoin(): Unit = {
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW proctime_window AS
+        |SELECT
+        |   a,
+        |   b,
+        |   window_start,
+        |   window_end
+        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
+    """.stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW win_join AS
+        |SELECT
+        |   w1.a as a,
+        |   w1.b as b,
+        |   COALESCE(w1.window_start, w2.window_start) as ws,
+        |   COALESCE(w1.window_end, w2.window_end) as we,
+        |   proctime() as new_proctime
+        |FROM proctime_window w1 join proctime_window w2
+        |ON w1.window_start = w2.window_start AND w1.window_end = w2.window_end
+    """.stripMargin)
+  }
+
+  @Test
+  def testInvalidRelaxFormCascadeProctimeWindow_OnWindowJoin(): Unit = {
+    assumeTrue(!isTwoPhase)
+    createWindowJoin
+
+    // a relax form of cascaded proctime window on a window join is unsupported for now, will be translated to group agg
+    util.verifyRelPlan("""
+                         |SELECT
+                         |  a,
+                         |  ws,
+                         |  we,
+                         |  COUNT(*)
+                         |FROM win_join
+                         |GROUP BY a, ws, we
+      """.stripMargin)
+  }
 }
 
 object WindowAggregateTest {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index ee956674011..725949a5ae0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction
-import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}
+import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink, TestingRetractSink}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy._
@@ -942,6 +942,51 @@ class WindowAggregateITCase(
     }
     assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n"))
   }
+
+  @Test
+  def testRelaxFormProctimeCascadeWindowAgg(): Unit = {
+    val timestampDataId = TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE proctime_src (
+                       | `ts` STRING,
+                       | `int` INT,
+                       | `double` DOUBLE,
+                       | `float` FLOAT,
+                       | `bigdec` DECIMAL(10, 2),
+                       | `string` STRING,
+                       | `name` STRING,
+                       | `proctime` AS PROCTIME()
+                       |) WITH (
+                       | 'connector' = 'values',
+                       | 'data-id' = '$timestampDataId',
+                       | 'failing-source' = 'true'
+                       |)
+                       |""".stripMargin)
+
+    val sql =
+      """
+        |SELECT
+        |  window_start,
+        |  window_end,
+        |  COUNT(*)
+        |FROM
+        |(
+        |    SELECT
+        |    `name`,
+        |    window_start,
+        |    window_end,
+        |    COUNT(DISTINCT `string`) AS cnt
+        |    FROM TABLE(
+        |      TUMBLE(TABLE proctime_src, DESCRIPTOR(proctime), INTERVAL '1' SECOND))
+        |    GROUP BY `name`, window_start, window_end
+        |) GROUP BY window_start, window_end
+        """.stripMargin
+    val sink = new TestingRetractSink()
+    val res = tEnv.sqlQuery(sql)
+    res.toRetractStream[Row].addSink(sink)
+    // do not verify the result due to proctime window aggregate result is non-deterministic
+    env.execute()
+  }
 }
 
 object WindowAggregateITCase {