You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/22 07:35:44 UTC
[flink] branch master updated: [FLINK-16345][table-planner-blink]
Fix computed column can not refer row time attribute column
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 54395d9 [FLINK-16345][table-planner-blink] Fix computed column can not refer row time attribute column
54395d9 is described below
commit 54395d9fcb3ab7b7ad6548e9003e24c24f9081c4
Author: Jark Wu <ja...@apache.org>
AuthorDate: Sun Mar 22 15:35:13 2020 +0800
[FLINK-16345][table-planner-blink] Fix computed column can not refer row time attribute column
This closes #11424
---
.../planner/plan/schema/CatalogSourceTable.scala | 37 +++++++++++++++++++---
.../planner/plan/stream/sql/TableScanTest.xml | 34 ++++++++++++++++----
.../planner/plan/stream/sql/TableScanTest.scala | 19 +++++++++++
3 files changed, 80 insertions(+), 10 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 12801cc..72764fe 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.catalog.CatalogTable
import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, TableSourceFactoryContextImpl}
-import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.planner.catalog.CatalogSchemaTable
import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
@@ -30,6 +30,7 @@ import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.types.logical.{TimestampKind, TimestampType}
import java.util
import java.util.{List => JList}
@@ -78,12 +79,16 @@ class CatalogSourceTable[T](
.getPlanner
.getContext
.unwrap(classOf[FlinkContext])
+ val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+ // erase time indicator types in the rowType
+ val erasedRowType = eraseTimeIndicator(rowType, typeFactory)
val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)
val tableSourceTable = new TableSourceTable[T](
relOptSchema,
schemaTable.getTableIdentifier,
- rowType,
+ erasedRowType,
statistic,
tableSource,
schemaTable.isStreamingMode,
@@ -106,7 +111,7 @@ class CatalogSourceTable[T](
val toRexFactory = flinkContext.getSqlExprToRexConverterFactory
// 2. push computed column project
- val fieldNames = rowType.getFieldNames.asScala
+ val fieldNames = erasedRowType.getFieldNames.asScala
if (columnExprs.nonEmpty) {
val fieldExprs = fieldNames
.map { name =>
@@ -139,7 +144,7 @@ class CatalogSourceTable[T](
}
val rowtimeIndex = fieldNames.indexOf(rowtime)
val watermarkRexNode = toRexFactory
- .create(rowType)
+ .create(erasedRowType)
.convertToRexNode(watermarkSpec.get.getWatermarkExpr)
relBuilder.watermark(rowtimeIndex, watermarkRexNode)
}
@@ -179,4 +184,28 @@ class CatalogSourceTable[T](
tableSource.asInstanceOf[TableSource[T]]
}
+
+ /**
+ * Erases time indicators, i.e. converts rowtime and proctime type into regular timestamp type.
+ * This is required before converting this [[CatalogSourceTable]] into multiple RelNodes,
+ * otherwise the derived data types are mismatch.
+ */
+ private def eraseTimeIndicator(
+ relDataType: RelDataType,
+ factory: FlinkTypeFactory): RelDataType = {
+ val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
+ val fieldNames = logicalRowType.getFieldNames
+ val fieldTypes = logicalRowType.getFields.map { f =>
+ if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
+ val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
+ new TimestampType(
+ timeIndicatorType.isNullable,
+ TimestampKind.REGULAR,
+ timeIndicatorType.getPrecision)
+ } else {
+ f.getType
+ }
+ }
+ factory.buildRelNodeRowType(fieldNames, fieldTypes)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 693a171..1c54d3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -52,6 +52,23 @@ Calc(select=[ts, a, b], where=[>(a, 1)])
]]>
</Resource>
</TestCase>
+ <TestCase name="testDDLWithComputedColumn">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
++- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableSourceScan">
<Resource name="sql">
<![CDATA[SELECT * FROM MyTable]]>
@@ -68,20 +85,25 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
]]>
</Resource>
</TestCase>
- <TestCase name="testDDLWithComputedColumn">
+ <TestCase name="testDDLWithComputedColumnReferRowtime">
<Resource name="sql">
- <![CDATA[SELECT * FROM t1]]>
+ <![CDATA[SELECT * FROM src WHERE a > 1]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
-+- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
+LogicalProject(ts=[$0], a=[$1], b=[$2], my_ts=[$3], proc=[$4])
++- LogicalFilter(condition=[>($1, 1)])
+ +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($0, 1:INTERVAL SECOND)])
+ +- LogicalProject(ts=[$0], a=[$1], b=[$2], my_ts=[-($0, 1:INTERVAL SECOND)], proc=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
-+- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
+Calc(select=[ts, a, b, my_ts, PROCTIME_MATERIALIZE(proc) AS proc], where=[>(a, 1)])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1:INTERVAL SECOND)])
+ +- Calc(select=[ts, a, b, -(ts, 1:INTERVAL SECOND) AS my_ts, PROCTIME() AS proc])
+ +- TableSourceScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]], fields=[ts, a, b])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index edc9c3b..0ae7cb7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -99,6 +99,25 @@ class TableScanTest extends TableTestBase {
}
@Test
+ def testDDLWithComputedColumnReferRowtime(): Unit = {
+ util.addTable(
+ """
+ |CREATE TABLE src (
+ | ts TIMESTAMP(3),
+ | a INT,
+ | b DOUBLE,
+ | my_ts AS ts - INTERVAL '0.001' SECOND,
+ | proc AS PROCTIME(),
+ | WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ util.verifyPlan("SELECT * FROM src WHERE a > 1")
+ }
+
+ @Test
def testKeywordsWithWatermarkComputedColumn(): Unit = {
// Create table with field as atom expression.
util.tableEnv.registerFunction("my_udf", Func0)