You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/07/20 02:20:33 UTC
[flink] branch release-1.17 updated: [FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever
This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new dc8b70c2fcb [FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever
dc8b70c2fcb is described below
commit dc8b70c2fcbb429a27a9cc1e263d9a38c2d7da34
Author: lincoln lee <li...@gmail.com>
AuthorDate: Thu Jul 20 10:20:26 2023 +0800
[FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever
This closes #23022
---
.../stream/StreamPhysicalGroupAggregateRule.scala | 6 +-
.../stream/StreamPhysicalWindowAggregateRule.scala | 5 +-
.../table/planner/plan/utils/WindowUtil.scala | 51 ++-
.../plan/stream/sql/agg/WindowAggregateTest.xml | 502 +++++++++++++++++++++
.../plan/stream/sql/agg/WindowAggregateTest.scala | 240 ++++++++++
.../runtime/stream/sql/WindowAggregateITCase.scala | 47 +-
6 files changed, 840 insertions(+), 11 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
index a7f71c2e515..94d02cd5447 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupAggregate
@@ -50,10 +49,7 @@ class StreamPhysicalGroupAggregateRule(config: Config) extends ConverterRule(con
}
// check not window aggregate
- val fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery)
- val windowProperties = fmq.getRelWindowProperties(agg.getInput)
- val grouping = agg.getGroupSet
- !WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)
+ !WindowUtil.isValidWindowAggregate(agg)
}
override def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
index 4778c4c9c0a..f59286c84ed 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
@@ -55,10 +55,7 @@ class StreamPhysicalWindowAggregateRule(config: Config) extends ConverterRule(co
return false
}
- val fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery)
- val windowProperties = fmq.getRelWindowProperties(agg.getInput)
- val grouping = agg.getGroupSet
- WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)
+ WindowUtil.isValidWindowAggregate(agg)
}
override def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index fdd7a81c191..98d52357a25 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlW
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
import org.apache.flink.table.planner.plan.logical._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalJoin, FlinkLogicalRank, FlinkLogicalTableFunctionScan}
import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import org.apache.flink.table.planner.typeutils.RowTypeUtils
@@ -32,16 +33,19 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat
import org.apache.flink.table.types.logical.TimestampType
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType
+import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rel.core.{Aggregate, AggregateCall, Calc}
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.util.ImmutableBitSet
+import org.apache.calcite.util.{ImmutableBitSet, Util}
import java.time.Duration
import java.util.Collections
+import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -310,6 +314,21 @@ object WindowUtil {
groupingTypes ++ accTypes.map(fromDataTypeToLogicalType) ++ sliceEndType)
}
+ /**
+ * For rowtime window, return true if the given aggregate grouping contains window start and end.
+ * For proctime window, we should also check if it exists a neighbour windowTableFunctionCall.
+ */
+ def isValidWindowAggregate(agg: FlinkLogicalAggregate): Boolean = {
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(agg.getCluster.getMetadataQuery)
+ val windowProperties = fmq.getRelWindowProperties(agg.getInput)
+ val grouping = agg.getGroupSet
+ if (WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)) {
+ windowProperties.isRowtime || existNeighbourWindowTableFunc(agg.getInput)
+ } else {
+ false
+ }
+ }
+
// ------------------------------------------------------------------------------------------
// Private Helpers
// ------------------------------------------------------------------------------------------
@@ -343,4 +362,34 @@ object WindowUtil {
}
}
+ private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+
+ @tailrec
+ def find(rel: RelNode): Unit = {
+ rel match {
+ case rss: RelSubset =>
+ val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+ find(innerRel)
+
+ case scan: FlinkLogicalTableFunctionScan =>
+ if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+ throw new Util.FoundOne
+ }
+ find(scan.getInput(0))
+
+ // proctime attribute comes from these operators can not be used directly for proctime
+ // window aggregate, so further traversal of child nodes is unnecessary
+ case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _: FlinkLogicalJoin =>
+
+ case sr: SingleRel => find(sr.getInput)
+ }
+ }
+
+ try {
+ find(rel)
+ } catch {
+ case _: Util.FoundOne => return true
+ }
+ false
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 7c7de7a16b5..f1341241787 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -1565,6 +1565,181 @@ Calc(select=[a, b, uv])
+- Expand(projects=[{a, b, c, 0 AS $e, rowtime}, {a, null AS b, c, 4 AS $e, rowtime}, {null AS a, null AS b, c, 12 AS $e, rowtime}])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInvalidRelaxFormCascadeProctimeWindow[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ ws,
+ we,
+ COUNT(*)
+FROM proctime_win
+GROUP BY a, ws, we
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[$1], we=[$2])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}], cnt=[COUNT()], sum_d=[SUM($5)], max_d=[MAX($5)])
+ +- LogicalProject(a=[$0], ws=[$7], we=[$8], wt=[$9], b=[$1], d=[$3])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+ +- Calc(select=[a, window_start AS ws, window_end AS we])
+ +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
+ +- Exchange(distribution=[hash[a, b]])
+ +- Calc(select=[a, b, d, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, d, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInvalidRelaxFormCascadeProctimeWindow_OnWindowDedup[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ ws,
+ we,
+ COUNT(*)
+FROM proctime_windedup
+GROUP BY a, ws, we
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[$2], we=[$3])
+ +- LogicalFilter(condition=[<=($6, 1)])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+ +- Calc(select=[a, window_start AS ws, window_end AS we])
+ +- WindowDeduplicate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], keep=[LastRow], partitionKeys=[], orderKey=[proctime], order=[PROCTIME])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[a, proctime, window_start, window_end])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ +- Calc(select=[a, b, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInvalidRelaxFormCascadeProctimeWindow_OnWindowJoin[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ ws,
+ we,
+ COUNT(*)
+FROM win_join
+GROUP BY a, ws, we
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[COALESCE($2, $6)], we=[COALESCE($3, $7)])
+ +- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))], joinType=[inner])
+ :- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+ : +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+ +- Calc(select=[a, window_start AS ws, window_end AS we])
+ +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], joinType=[InnerJoin], where=[true], select=[a, window_start, window_end, window_start0, window_end0])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a, window_start, window_end])
+ : +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ : +- Calc(select=[a, proctime])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ : +- Calc(select=[a, PROCTIME() AS proctime, rowtime])
+ : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, rowtime], metadata=[]]], fields=[a, rowtime])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[window_start, window_end])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ +- Calc(select=[proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInvalidRelaxFormCascadeProctimeWindow_OnWindowRank[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ ws,
+ we,
+ COUNT(*)
+FROM proctime_winrank
+GROUP BY a, ws, we
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], ws=[$2], we=[$3])
+ +- LogicalFilter(condition=[<=($6, 10)])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a, ws, we], select=[a, ws, we, COUNT(*) AS EXPR$3])
++- Exchange(distribution=[hash[a, ws, we]])
+ +- Calc(select=[a, window_start AS ws, window_end AS we])
+ +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[proctime DESC], select=[a, proctime, window_start, window_end])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[a, proctime, window_start, window_end])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ +- Calc(select=[a, b, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
]]>
</Resource>
</TestCase>
@@ -1734,6 +1909,151 @@ Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
+- Calc(select=[a, d, IS NOT NULL(b) AS $f4, b, e, c, rowtime], where=[>(b, 1000)])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumble_CascadeProctimeWindow_OnWindowDedup[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ COUNT(*)
+FROM TABLE(TUMBLE(TABLE proctime_windedup, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT rn, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], wt=[$4], new_proctime=[$5], rn=[$6])
+ +- LogicalFilter(condition=[<=($6, 1)])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[$6], size=[5 min])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, $6])
+ +- WindowDeduplicate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], keep=[LastRow], partitionKeys=[], orderKey=[proctime], order=[PROCTIME])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[a, proctime, window_start, window_end, PROCTIME() AS $6])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ +- Calc(select=[a, b, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumble_CascadeProctimeWindow_OnWindowJoin[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ COUNT(*)
+FROM TABLE(TUMBLE(TABLE win_join, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$5], window_end=[$6])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], ws=[COALESCE($2, $6)], we=[COALESCE($3, $7)], new_proctime=[PROCTIME()])
+ +- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))], joinType=[inner])
+ :- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+ : +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[new_proctime], size=[5 min])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, PROCTIME() AS new_proctime])
+ +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], joinType=[InnerJoin], where=[true], select=[a, window_start, window_end, window_start0, window_end0])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a, window_start, window_end])
+ : +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ : +- Calc(select=[a, proctime])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ : +- Calc(select=[a, PROCTIME() AS proctime, rowtime])
+ : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, rowtime], metadata=[]]], fields=[a, rowtime])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[window_start, window_end])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ +- Calc(select=[proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumble_CascadeProctimeWindow_OnWindowRank[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ COUNT(*)
+FROM TABLE(TUMBLE(TABLE proctime_winrank, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT rn, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], wt=[$4], new_proctime=[$5], rn=[$6])
+ +- LogicalFilter(condition=[<=($6, 10)])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$7], we=[$8], wt=[$9], new_proctime=[PROCTIME()], rn=[ROW_NUMBER() OVER (PARTITION BY $7, $8 ORDER BY $6 DESC NULLS LAST)])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[$6], size=[5 min])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, $6])
+ +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[proctime DESC], select=[a, proctime, window_start, window_end, $6])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[a, proctime, window_start, window_end, PROCTIME() AS $6])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[5 min])])
+ +- Calc(select=[a, b, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, rowtime], metadata=[]]], fields=[a, b, rowtime])
]]>
</Resource>
</TestCase>
@@ -1820,6 +2140,188 @@ Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5])
+- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[5 min])], select=[a, b, COUNT(*) AS count1$0, SUM(d) AS sum$1, MAX(d) AS max$2, slice_end('w$) AS $slice_end])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumble_CascadingWindow_OnIndividualProctime[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ sum(cnt),
+ count(*)
+FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(new_proctime), INTERVAL '10' MINUTE))
+GROUP BY a, window_start, window_end
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[SUM($3)], EXPR$3=[COUNT()])
+ +- LogicalProject(a=[$0], window_start=[$9], window_end=[$10], cnt=[$6])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($8, DESCRIPTOR($5), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT cnt, DECIMAL(38, 3) sum_d, DECIMAL(10, 3) max_d, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$4], ws=[$1], we=[$2], wt=[$3], new_proctime=[PROCTIME()], cnt=[$5], sum_d=[$6], max_d=[$7])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}], cnt=[COUNT()], sum_d=[SUM($5)], max_d=[MAX($5)])
+ +- LogicalProject(a=[$0], ws=[$7], we=[$8], wt=[$9], b=[$1], d=[$3])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[new_proctime], size=[10 min])], select=[a, $SUM0(cnt) AS EXPR$2, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, cnt, PROCTIME() AS new_proctime])
+ +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS cnt, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
+ +- Exchange(distribution=[hash[a, b]])
+ +- Calc(select=[a, b, d, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, d, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumble_CascadingWindow_OnInheritProctime[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ sum(cnt),
+ count(*)
+FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(wt), INTERVAL '10' MINUTE))
+GROUP BY a, window_start, window_end
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[SUM($3)], EXPR$3=[COUNT()])
+ +- LogicalProject(a=[$0], window_start=[$9], window_end=[$10], cnt=[$6])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($8, DESCRIPTOR($4), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, TIMESTAMP_LTZ(3) *PROCTIME* wt, TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, BIGINT cnt, DECIMAL(38, 3) sum_d, DECIMAL(10, 3) max_d, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$4], ws=[$1], we=[$2], wt=[$3], new_proctime=[PROCTIME()], cnt=[$5], sum_d=[$6], max_d=[$7])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}], cnt=[COUNT()], sum_d=[SUM($5)], max_d=[MAX($5)])
+ +- LogicalProject(a=[$0], ws=[$7], we=[$8], wt=[$9], b=[$1], d=[$3])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[wt], size=[10 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, cnt, window_time AS wt])
+ +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS cnt, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
+ +- Exchange(distribution=[hash[a, b]])
+ +- Calc(select=[a, b, d, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, d, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, d, rowtime], metadata=[]]], fields=[a, b, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumble_CascadingWindow_RelaxForm[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ COUNT(*)
+ FROM
+ (
+ SELECT
+ a,
+ window_start,
+ window_end,
+ COUNT(DISTINCT c) AS cnt
+ FROM TABLE(
+ TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR))
+ GROUP BY a, b, window_start, window_end
+) GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$2], window_end=[$3])
+ +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT(DISTINCT $4)])
+ +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 86400000:INTERVAL DAY, 28800000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, window_start, window_end])
+ +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a, b]])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumble_CascadingWindow_RelaxForm[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ COUNT(*)
+ FROM
+ (
+ SELECT
+ a,
+ window_start,
+ window_end,
+ COUNT(DISTINCT c) AS cnt
+ FROM TABLE(
+ TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR))
+ GROUP BY a, b, window_start, window_end
+) GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
++- LogicalProject(a=[$0], window_start=[$2], window_end=[$3])
+ +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT(DISTINCT $4)])
+ +- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 86400000:INTERVAL DAY, 28800000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3])
++- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end])
+ +- Calc(select=[a, window_start, window_end])
+ +- GlobalWindowAggregate(groupBy=[a, b], window=[TUMBLE(slice_end=[$slice_end], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a, b]])
+ +- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, slice_end('w$) AS $slice_end])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index efbb5756a49..cbf67d69066 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -65,6 +65,23 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
|)
|""".stripMargin)
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW proctime_win AS
+ |SELECT
+ | a,
+ | b,
+ | window_start as ws,
+ | window_end as we,
+ | window_time as wt,
+ | proctime() as new_proctime,
+ | count(*) as cnt,
+ | sum(d) as sum_d,
+ | max(d) as max_d
+ |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
+ |GROUP BY a, window_start, window_end, window_time, b
+ """.stripMargin)
+
// set agg-phase strategy
util.tableEnv.getConfig
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggPhaseEnforcer.toString)
@@ -278,6 +295,30 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
util.verifyRelPlan(sql)
}
+ @Test
+ def testTumble_CascadingWindow_RelaxForm(): Unit = {
+ // a relax form of cascaded rowtime window which is actually supported
+ util.verifyRelPlan(
+ """
+ |SELECT
+ | a,
+ | window_start,
+ | window_end,
+ | COUNT(*)
+ | FROM
+ | (
+ | SELECT
+ | a,
+ | window_start,
+ | window_end,
+ | COUNT(DISTINCT c) AS cnt
+ | FROM TABLE(
+ | TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR))
+ | GROUP BY a, b, window_start, window_end
+ |) GROUP BY a, window_start, window_end
+ """.stripMargin)
+ }
+
@Test
def testTumble_DistinctSplitEnabled(): Unit = {
util.tableEnv.getConfig
@@ -1009,6 +1050,205 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
""".stripMargin
util.verifyRelPlan(sql)
}
+
+ @Test
+ def testTumble_CascadingWindow_OnIndividualProctime(): Unit = {
+ assumeTrue(!isTwoPhase)
+ // a standard cascaded proctime window
+ util.verifyExecPlan(
+ """
+ |SELECT
+ | window_start,
+ | window_end,
+ | sum(cnt),
+ | count(*)
+ |FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(new_proctime), INTERVAL '10' MINUTE))
+ |GROUP BY a, window_start, window_end
+ |""".stripMargin)
+ }
+
+ @Test
+ def testTumble_CascadingWindow_OnInheritProctime(): Unit = {
+ assumeTrue(!isTwoPhase)
+ // a standard cascaded proctime window
+ util.verifyExecPlan(
+ """
+ |SELECT
+ | window_start,
+ | window_end,
+ | sum(cnt),
+ | count(*)
+ |FROM TABLE(TUMBLE(TABLE proctime_win, DESCRIPTOR(wt), INTERVAL '10' MINUTE))
+ |GROUP BY a, window_start, window_end
+ |""".stripMargin)
+ }
+
+ @Test
+ def testInvalidRelaxFormCascadeProctimeWindow(): Unit = {
+ assumeTrue(!isTwoPhase)
+ // a relax form of cascaded proctime window unsupported for now, will be translated to group agg
+ util.verifyRelPlan("""
+ |SELECT
+ | a,
+ | ws,
+ | we,
+ | COUNT(*)
+ |FROM proctime_win
+ |GROUP BY a, ws, we
+ """.stripMargin)
+ }
+
+ @Test
+ def testTumble_CascadeProctimeWindow_OnWindowRank(): Unit = {
+ assumeTrue(!isTwoPhase)
+ // create window top10
+ createProctimeWindowTopN("proctime_winrank", 10)
+
+ util.verifyRelPlan(
+ """
+ |SELECT
+ | a,
+ | window_start,
+ | window_end,
+ | COUNT(*)
+ |FROM TABLE(TUMBLE(TABLE proctime_winrank, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+ |GROUP BY a, window_start, window_end
+ """.stripMargin)
+ }
+
+ private def createProctimeWindowTopN(viewName: String, topNum: Int): Unit = {
+ util.tableEnv.executeSql(
+ s"""
+ |CREATE VIEW $viewName AS
+ |SELECT *
+ |FROM(
+ | SELECT
+ | a,
+ | b,
+ | window_start as ws,
+ | window_end as we,
+ | window_time as wt,
+ | proctime() as new_proctime,
+ | ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY proctime DESC) AS rn
+ | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
+ |) WHERE rn <= $topNum
+ """.stripMargin)
+ }
+
+ @Test
+ def testInvalidRelaxFormCascadeProctimeWindow_OnWindowRank(): Unit = {
+ assumeTrue(!isTwoPhase)
+ // create window top10
+ createProctimeWindowTopN("proctime_winrank", 10)
+
+ // a relax form of cascaded proctime window on a window rank is unsupported for now, will be translated to group agg
+ util.verifyRelPlan("""
+ |SELECT
+ | a,
+ | ws,
+ | we,
+ | COUNT(*)
+ |FROM proctime_winrank
+ |GROUP BY a, ws, we
+ """.stripMargin)
+ }
+
+ @Test
+ def testTumble_CascadeProctimeWindow_OnWindowDedup(): Unit = {
+ assumeTrue(!isTwoPhase)
+ // create window dedup(top1)
+ createProctimeWindowTopN("proctime_windedup", 1)
+
+ // a relax form of cascaded proctime window on a window dedup is unsupported for now, will be translated to group agg
+ util.verifyRelPlan(
+ """
+ |SELECT
+ | a,
+ | window_start,
+ | window_end,
+ | COUNT(*)
+ |FROM TABLE(TUMBLE(TABLE proctime_windedup, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+ |GROUP BY a, window_start, window_end
+ """.stripMargin)
+ }
+
+ @Test
+ def testInvalidRelaxFormCascadeProctimeWindow_OnWindowDedup(): Unit = {
+ assumeTrue(!isTwoPhase)
+ // create window dedup(top1)
+ createProctimeWindowTopN("proctime_windedup", 1)
+
+ // a relax form of cascaded proctime window unsupported for now, will be translated to group agg
+ util.verifyRelPlan("""
+ |SELECT
+ | a,
+ | ws,
+ | we,
+ | COUNT(*)
+ |FROM proctime_windedup
+ |GROUP BY a, ws, we
+ """.stripMargin)
+ }
+
+ @Test
+ def testTumble_CascadeProctimeWindow_OnWindowJoin(): Unit = {
+ assumeTrue(!isTwoPhase)
+ createWindowJoin
+
+ util.verifyRelPlan(
+ """
+ |SELECT
+ | a,
+ | window_start,
+ | window_end,
+ | COUNT(*)
+ |FROM TABLE(TUMBLE(TABLE win_join, DESCRIPTOR(new_proctime), INTERVAL '5' MINUTE))
+ |GROUP BY a, window_start, window_end
+ """.stripMargin)
+ }
+
+ private def createWindowJoin(): Unit = {
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW proctime_window AS
+ |SELECT
+ | a,
+ | b,
+ | window_start,
+ | window_end
+ |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
+ """.stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW win_join AS
+ |SELECT
+ | w1.a as a,
+ | w1.b as b,
+ | COALESCE(w1.window_start, w2.window_start) as ws,
+ | COALESCE(w1.window_end, w2.window_end) as we,
+ | proctime() as new_proctime
+ |FROM proctime_window w1 join proctime_window w2
+ |ON w1.window_start = w2.window_start AND w1.window_end = w2.window_end
+ """.stripMargin)
+ }
+
+ @Test
+ def testInvalidRelaxFormCascadeProctimeWindow_OnWindowJoin(): Unit = {
+ assumeTrue(!isTwoPhase)
+ createWindowJoin
+
+ // a relax form of cascaded proctime window on a window join is unsupported for now, will be translated to group agg
+ util.verifyRelPlan("""
+ |SELECT
+ | a,
+ | ws,
+ | we,
+ | COUNT(*)
+ |FROM win_join
+ |GROUP BY a, ws, we
+ """.stripMargin)
+ }
}
object WindowAggregateTest {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index ee956674011..725949a5ae0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction
-import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}
+import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink, TestingRetractSink}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy._
@@ -942,6 +942,51 @@ class WindowAggregateITCase(
}
assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n"))
}
+
+ @Test
+ def testRelaxFormProctimeCascadeWindowAgg(): Unit = {
+ val timestampDataId = TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp)
+ tEnv.executeSql(s"""
+ |CREATE TABLE proctime_src (
+ | `ts` STRING,
+ | `int` INT,
+ | `double` DOUBLE,
+ | `float` FLOAT,
+ | `bigdec` DECIMAL(10, 2),
+ | `string` STRING,
+ | `name` STRING,
+ | `proctime` AS PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$timestampDataId',
+ | 'failing-source' = 'true'
+ |)
+ |""".stripMargin)
+
+ val sql =
+ """
+ |SELECT
+ | window_start,
+ | window_end,
+ | COUNT(*)
+ |FROM
+ |(
+ | SELECT
+ | `name`,
+ | window_start,
+ | window_end,
+ | COUNT(DISTINCT `string`) AS cnt
+ | FROM TABLE(
+ | TUMBLE(TABLE proctime_src, DESCRIPTOR(proctime), INTERVAL '1' SECOND))
+ | GROUP BY `name`, window_start, window_end
+ |) GROUP BY window_start, window_end
+ """.stripMargin
+ val sink = new TestingRetractSink()
+ val res = tEnv.sqlQuery(sql)
+ res.toRetractStream[Row].addSink(sink)
+ // do not verify the result due to proctime window aggregate result is non-deterministic
+ env.execute()
+ }
}
object WindowAggregateITCase {