You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/22 07:35:44 UTC

[flink] branch master updated: [FLINK-16345][table-planner-blink] Fix computed column can not refer row time attribute column

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 54395d9  [FLINK-16345][table-planner-blink] Fix computed column can not refer row time attribute column
54395d9 is described below

commit 54395d9fcb3ab7b7ad6548e9003e24c24f9081c4
Author: Jark Wu <ja...@apache.org>
AuthorDate: Sun Mar 22 15:35:13 2020 +0800

    [FLINK-16345][table-planner-blink] Fix computed column can not refer row time attribute column
    
    This closes #11424
---
 .../planner/plan/schema/CatalogSourceTable.scala   | 37 +++++++++++++++++++---
 .../planner/plan/stream/sql/TableScanTest.xml      | 34 ++++++++++++++++----
 .../planner/plan/stream/sql/TableScanTest.scala    | 19 +++++++++++
 3 files changed, 80 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 12801cc..72764fe 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.catalog.CatalogTable
 import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, TableSourceFactoryContextImpl}
-import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
 import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
@@ -30,6 +30,7 @@ import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.types.logical.{TimestampKind, TimestampType}
 
 import java.util
 import java.util.{List => JList}
@@ -78,12 +79,16 @@ class CatalogSourceTable[T](
         .getPlanner
         .getContext
         .unwrap(classOf[FlinkContext])
+    val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    // erase time indicator types in the rowType
+    val erasedRowType = eraseTimeIndicator(rowType, typeFactory)
 
     val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)
     val tableSourceTable = new TableSourceTable[T](
       relOptSchema,
       schemaTable.getTableIdentifier,
-      rowType,
+      erasedRowType,
       statistic,
       tableSource,
       schemaTable.isStreamingMode,
@@ -106,7 +111,7 @@ class CatalogSourceTable[T](
     val toRexFactory = flinkContext.getSqlExprToRexConverterFactory
 
     // 2. push computed column project
-    val fieldNames = rowType.getFieldNames.asScala
+    val fieldNames = erasedRowType.getFieldNames.asScala
     if (columnExprs.nonEmpty) {
       val fieldExprs = fieldNames
           .map { name =>
@@ -139,7 +144,7 @@ class CatalogSourceTable[T](
       }
       val rowtimeIndex = fieldNames.indexOf(rowtime)
       val watermarkRexNode = toRexFactory
-          .create(rowType)
+          .create(erasedRowType)
           .convertToRexNode(watermarkSpec.get.getWatermarkExpr)
       relBuilder.watermark(rowtimeIndex, watermarkRexNode)
     }
@@ -179,4 +184,28 @@ class CatalogSourceTable[T](
 
     tableSource.asInstanceOf[TableSource[T]]
   }
+
+  /**
+   * Erases time indicators, i.e. converts rowtime and proctime type into regular timestamp type.
+   * This is required before converting this [[CatalogSourceTable]] into multiple RelNodes,
+   * otherwise the derived data types are mismatch.
+   */
+  private def eraseTimeIndicator(
+      relDataType: RelDataType,
+      factory: FlinkTypeFactory): RelDataType = {
+    val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
+    val fieldNames = logicalRowType.getFieldNames
+    val fieldTypes = logicalRowType.getFields.map { f =>
+      if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
+        val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
+        new TimestampType(
+          timeIndicatorType.isNullable,
+          TimestampKind.REGULAR,
+          timeIndicatorType.getPrecision)
+      } else {
+        f.getType
+      }
+    }
+    factory.buildRelNodeRowType(fieldNames, fieldTypes)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 693a171..1c54d3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -52,6 +52,23 @@ Calc(select=[ts, a, b], where=[>(a, 1)])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testDDLWithComputedColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
++- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTableSourceScan">
     <Resource name="sql">
       <![CDATA[SELECT * FROM MyTable]]>
@@ -68,20 +85,25 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDDLWithComputedColumn">
+  <TestCase name="testDDLWithComputedColumnReferRowtime">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM t1]]>
+      <![CDATA[SELECT * FROM src WHERE a > 1]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
-+- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
+LogicalProject(ts=[$0], a=[$1], b=[$2], my_ts=[$3], proc=[$4])
++- LogicalFilter(condition=[>($1, 1)])
+   +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($0, 1:INTERVAL SECOND)])
+      +- LogicalProject(ts=[$0], a=[$1], b=[$2], my_ts=[-($0, 1:INTERVAL SECOND)], proc=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
-+- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
+Calc(select=[ts, a, b, my_ts, PROCTIME_MATERIALIZE(proc) AS proc], where=[>(a, 1)])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1:INTERVAL SECOND)])
+   +- Calc(select=[ts, a, b, -(ts, 1:INTERVAL SECOND) AS my_ts, PROCTIME() AS proc])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]], fields=[ts, a, b])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index edc9c3b..0ae7cb7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -99,6 +99,25 @@ class TableScanTest extends TableTestBase {
   }
 
   @Test
+  def testDDLWithComputedColumnReferRowtime(): Unit = {
+    util.addTable(
+      """
+        |CREATE TABLE src (
+        |  ts TIMESTAMP(3),
+        |  a INT,
+        |  b DOUBLE,
+        |  my_ts AS ts - INTERVAL '0.001' SECOND,
+        |  proc AS PROCTIME(),
+        |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin)
+    util.verifyPlan("SELECT * FROM src WHERE a > 1")
+  }
+
+  @Test
   def testKeywordsWithWatermarkComputedColumn(): Unit = {
     // Create table with field as atom expression.
     util.tableEnv.registerFunction("my_udf", Func0)