You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2021/11/01 09:01:54 UTC

[flink] branch release-1.14 updated: [FLINK-23919][table-planner] Fix field name conflict bug in WindowUtil

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 5b3e3a8  [FLINK-23919][table-planner] Fix field name conflict bug in WindowUtil
5b3e3a8 is described below

commit 5b3e3a8fd1dec3a41a7ff41835dc11456ad6836b
Author: Jing <be...@126.com>
AuthorDate: Fri Oct 29 19:15:50 2021 +0800

    [FLINK-23919][table-planner] Fix field name conflict bug in WindowUtil
    
    This closes #17604
    
    (cherry picked from commit 003df215b482c246c48c147b63b56608c6557cba)
---
 .../table/planner/plan/utils/WindowUtil.scala      | 13 +++--
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 67 ++++++++++++++++++++++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  | 13 +++++
 .../runtime/stream/sql/WindowAggregateITCase.scala | 33 +++++++++++
 4 files changed, 122 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index 37056b8..b95565e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.logical._
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
 import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.logical.TimestampType
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType
@@ -43,6 +44,7 @@ import java.time.Duration
 import java.util.Collections
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -119,10 +121,10 @@ object WindowUtil {
     var containsTimeAttribute = false
     var newTimeAttributeIndex = -1
     val calcFieldShifting = ArrayBuffer[Int]()
-
+    val visitedProjectNames = new mutable.ArrayBuffer[String]
     oldProgram.getNamedProjects.foreach { namedProject =>
       val expr = oldProgram.expandLocalRef(namedProject.left)
-      val name = namedProject.right
+      val uniqueName = RowTypeUtils.getUniqueName(namedProject.right, visitedProjectNames)
       // project columns except window columns
       expr match {
         case inputRef: RexInputRef if windowColumns.contains(inputRef.getIndex) =>
@@ -130,7 +132,8 @@ object WindowUtil {
 
         case _ =>
           try {
-            programBuilder.addProject(expr, name)
+            programBuilder.addProject(expr, uniqueName)
+            visitedProjectNames += uniqueName
           } catch {
             case e: Throwable =>
               e.printStackTrace()
@@ -149,9 +152,11 @@ object WindowUtil {
 
     // append time attribute if the calc doesn't refer it
     if (!containsTimeAttribute) {
+      val oldTimeAttributeFieldName = inputRowType.getFieldNames.get(inputTimeAttributeIndex)
+      val uniqueName = RowTypeUtils.getUniqueName(oldTimeAttributeFieldName, visitedProjectNames)
       programBuilder.addProject(
         inputTimeAttributeIndex,
-        inputRowType.getFieldNames.get(inputTimeAttributeIndex))
+        uniqueName)
       newTimeAttributeIndex = programBuilder.getProjectList.size() - 1
     }
 
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 5141a01..b1a461f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -999,6 +999,73 @@ Calc(select=[a, b, uv])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFieldNameConflict[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT window_time,
+  MIN(rowtime) as start_time,
+  MAX(rowtime) as end_time
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+GROUP BY window_start, window_end, window_time
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(window_time=[$2], start_time=[$3], end_time=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], start_time=[MIN($3)], end_time=[MAX($3)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], window_time=[$9], rowtime=[$5])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[window_time, start_time, end_time])
++- WindowAggregate(window=[TUMBLE(time_col=[rowtime_0], size=[15 min])], select=[MIN(rowtime) AS start_time, MAX(rowtime) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[CAST(rowtime) AS rowtime, rowtime AS rowtime_0])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFieldNameConflict[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT window_time,
+  MIN(rowtime) as start_time,
+  MAX(rowtime) as end_time
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+GROUP BY window_start, window_end, window_time
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(window_time=[$2], start_time=[$3], end_time=[$4])
++- LogicalAggregate(group=[{0, 1, 2}], start_time=[MIN($3)], end_time=[MAX($3)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], window_time=[$9], rowtime=[$5])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[window_time, start_time, end_time])
++- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[MIN(min$0) AS start_time, MAX(max$1) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+   +- Exchange(distribution=[single])
+      +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime_0], size=[15 min])], select=[MIN(rowtime) AS min$0, MAX(rowtime) AS max$1, slice_end('w$) AS $slice_end])
+         +- Calc(select=[CAST(rowtime) AS rowtime, rowtime AS rowtime_0])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testHop_Cube[aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 96d3988..d1b9a62 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -999,6 +999,19 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
       """.stripMargin
     util.verifyRelPlan(sql)
   }
+
+  @Test
+  def testFieldNameConflict(): Unit = {
+    val sql =
+      """
+        |SELECT window_time,
+        |  MIN(rowtime) as start_time,
+        |  MAX(rowtime) as end_time
+        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+        |GROUP BY window_start, window_end, window_time
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
 }
 
 object WindowAggregateTest {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 3e79333..d0597d6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -900,6 +900,39 @@ class WindowAggregateITCase(
       CumulateWindowRollupExpectedData.sorted.mkString("\n"),
       sink.getAppendResults.sorted.mkString("\n"))
   }
+
+  @Test
+  def testFieldNameConflict(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  window_time,
+        |  MIN(rowtime) as start_time,
+        |  MAX(rowtime) as end_time
+        |FROM TABLE(
+        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
+        |GROUP BY window_start, window_end, window_time
+      """.stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = if (useTimestampLtz) {
+      Seq(
+        "2020-10-09T16:00:04.999Z,2020-10-09T16:00:01Z,2020-10-09T16:00:04Z",
+        "2020-10-09T16:00:09.999Z,2020-10-09T16:00:06Z,2020-10-09T16:00:08Z",
+        "2020-10-09T16:00:19.999Z,2020-10-09T16:00:16Z,2020-10-09T16:00:16Z",
+        "2020-10-09T16:00:34.999Z,2020-10-09T16:00:32Z,2020-10-09T16:00:34Z")
+    } else {
+      Seq(
+        "2020-10-10T00:00:04.999,2020-10-10T00:00:01,2020-10-10T00:00:04",
+        "2020-10-10T00:00:09.999,2020-10-10T00:00:06,2020-10-10T00:00:08",
+        "2020-10-10T00:00:19.999,2020-10-10T00:00:16,2020-10-10T00:00:16",
+        "2020-10-10T00:00:34.999,2020-10-10T00:00:32,2020-10-10T00:00:34")
+    }
+    assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n"))
+  }
 }
 
 object WindowAggregateITCase {