You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2021/11/01 09:52:16 UTC
[flink] branch release-1.13 updated:
[FLINK-23919][table-planner-blink] Fix field name conflict bug in
WindowUtil
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 934aa94 [FLINK-23919][table-planner-blink] Fix field name conflict bug in WindowUtil
934aa94 is described below
commit 934aa94c8509149079e375879ff5d3d4b86e15ad
Author: Jing <be...@126.com>
AuthorDate: Fri Oct 29 19:15:50 2021 +0800
[FLINK-23919][table-planner-blink] Fix field name conflict bug in WindowUtil
This closes #17604
(cherry picked from commit 003df215b482c246c48c147b63b56608c6557cba)
---
.../table/planner/plan/utils/WindowUtil.scala | 13 ++--
.../plan/stream/sql/agg/WindowAggregateTest.xml | 69 ++++++++++++++++++++++
.../plan/stream/sql/agg/WindowAggregateTest.scala | 13 ++++
.../runtime/stream/sql/WindowAggregateITCase.scala | 33 +++++++++++
4 files changed, 124 insertions(+), 4 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index 73774cf..579fc30 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.logical._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.types.logical.TimestampType
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType
@@ -43,6 +44,7 @@ import java.time.Duration
import java.util.Collections
import scala.collection.JavaConversions._
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
@@ -119,10 +121,10 @@ object WindowUtil {
var containsTimeAttribute = false
var newTimeAttributeIndex = -1
val calcFieldShifting = ArrayBuffer[Int]()
-
+ val visitedProjectNames = new mutable.ArrayBuffer[String]
oldProgram.getNamedProjects.foreach { namedProject =>
val expr = oldProgram.expandLocalRef(namedProject.left)
- val name = namedProject.right
+ val uniqueName = RowTypeUtils.getUniqueName(namedProject.right, visitedProjectNames)
// project columns except window columns
expr match {
case inputRef: RexInputRef if windowColumns.contains(inputRef.getIndex) =>
@@ -130,7 +132,8 @@ object WindowUtil {
case _ =>
try {
- programBuilder.addProject(expr, name)
+ programBuilder.addProject(expr, uniqueName)
+ visitedProjectNames += uniqueName
} catch {
case e: Throwable =>
e.printStackTrace()
@@ -149,9 +152,11 @@ object WindowUtil {
// append time attribute if the calc doesn't refer it
if (!containsTimeAttribute) {
+ val oldTimeAttributeFieldName = inputRowType.getFieldNames.get(inputTimeAttributeIndex)
+ val uniqueName = RowTypeUtils.getUniqueName(oldTimeAttributeFieldName, visitedProjectNames)
programBuilder.addProject(
inputTimeAttributeIndex,
- inputRowType.getFieldNames.get(inputTimeAttributeIndex))
+ uniqueName)
newTimeAttributeIndex = programBuilder.getProjectList.size() - 1
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index f81ff85..75a2a78 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -1029,6 +1029,75 @@ Calc(select=[a, b, uv])
]]>
</Resource>
</TestCase>
+ <TestCase name="testFieldNameConflict[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT window_time,
+ MIN(rowtime) as start_time,
+ MAX(rowtime) as end_time
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+GROUP BY window_start, window_end, window_time
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(window_time=[$2], start_time=[$3], end_time=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], start_time=[MIN($3)], end_time=[MAX($3)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], window_time=[$9], rowtime=[$5])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_time, start_time, end_time])
++- WindowAggregate(window=[TUMBLE(time_col=[rowtime_0], size=[15 min])], select=[MIN(rowtime) AS start_time, MAX(rowtime) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[CAST(rowtime) AS rowtime, rowtime AS rowtime_0])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFieldNameConflict[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT window_time,
+ MIN(rowtime) as start_time,
+ MAX(rowtime) as end_time
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+GROUP BY window_start, window_end, window_time
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(window_time=[$2], start_time=[$3], end_time=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], start_time=[MIN($3)], end_time=[MAX($3)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], window_time=[$9], rowtime=[$5])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_time, start_time, end_time])
++- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[MIN(min$0) AS start_time, MAX(max$1) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+ +- Exchange(distribution=[single])
+ +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime_0], size=[15 min])], select=[MIN(rowtime) AS min$0, MAX(rowtime) AS max$1, slice_end('w$) AS $slice_end])
+ +- Calc(select=[CAST(rowtime) AS rowtime, rowtime AS rowtime_0])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testHop_Cube[aggPhaseEnforcer=ONE_PHASE]">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 90a6810..49d1020 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -1000,6 +1000,19 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
""".stripMargin
util.verifyRelPlan(sql)
}
+
+ @Test
+ def testFieldNameConflict(): Unit = {
+ val sql =
+ """
+ |SELECT window_time,
+ | MIN(rowtime) as start_time,
+ | MAX(rowtime) as end_time
+ |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+ |GROUP BY window_start, window_end, window_time
+ """.stripMargin
+ util.verifyRelPlan(sql)
+ }
}
object WindowAggregateTest {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 2f449a9..07562e1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -659,6 +659,39 @@ class WindowAggregateITCase(
CumulateWindowRollupExpectedData.sorted.mkString("\n"),
sink.getAppendResults.sorted.mkString("\n"))
}
+
+ @Test
+ def testFieldNameConflict(): Unit = {
+ val sql =
+ """
+ |SELECT
+ | window_time,
+ | MIN(rowtime) as start_time,
+ | MAX(rowtime) as end_time
+ |FROM TABLE(
+ | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
+ |GROUP BY window_start, window_end, window_time
+ """.stripMargin
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = if (useTimestampLtz) {
+ Seq(
+ "2020-10-09T16:00:04.999Z,2020-10-09T16:00:01Z,2020-10-09T16:00:04Z",
+ "2020-10-09T16:00:09.999Z,2020-10-09T16:00:06Z,2020-10-09T16:00:08Z",
+ "2020-10-09T16:00:19.999Z,2020-10-09T16:00:16Z,2020-10-09T16:00:16Z",
+ "2020-10-09T16:00:34.999Z,2020-10-09T16:00:32Z,2020-10-09T16:00:34Z")
+ } else {
+ Seq(
+ "2020-10-10T00:00:04.999,2020-10-10T00:00:01,2020-10-10T00:00:04",
+ "2020-10-10T00:00:09.999,2020-10-10T00:00:06,2020-10-10T00:00:08",
+ "2020-10-10T00:00:19.999,2020-10-10T00:00:16,2020-10-10T00:00:16",
+ "2020-10-10T00:00:34.999,2020-10-10T00:00:32,2020-10-10T00:00:34")
+ }
+ assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n"))
+ }
}
object WindowAggregateITCase {