You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/12 06:38:25 UTC

[flink] branch release-1.10 updated: [FLINK-16526][table-planner-blink] Fix exception when computed column expression references a keyword column name

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 884edd6  [FLINK-16526][table-planner-blink] Fix exception when computed column expression references a keyword column name
884edd6 is described below

commit 884edd6dec549450ac44eb80d83de85eb50dc11b
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Mar 12 14:34:55 2020 +0800

    [FLINK-16526][table-planner-blink] Fix exception when computed column expression references a keyword column name
    
    This is a followup fix of FLINK-16018.
    
    This closes #11380
---
 .../operations/SqlToOperationConverter.java        | 21 ++++++++++---
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  2 +-
 .../planner/plan/stream/sql/TableScanTest.xml      | 35 ++++++++++++----------
 .../table/planner/catalog/CatalogTableITCase.scala | 18 ++++++-----
 .../planner/plan/stream/sql/TableScanTest.scala    | 23 ++++++++++++++
 5 files changed, 71 insertions(+), 28 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 237c2b7..9899836 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -74,10 +74,13 @@ import org.apache.flink.util.StringUtils;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.dialect.CalciteSqlDialect;
+import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.validate.SqlValidator;
 
 import java.util.HashMap;
@@ -98,8 +101,8 @@ import java.util.stream.Collectors;
  * {@link org.apache.flink.table.delegation.Planner}.
  */
 public class SqlToOperationConverter {
-	private FlinkPlannerImpl flinkPlanner;
-	private CatalogManager catalogManager;
+	private final FlinkPlannerImpl flinkPlanner;
+	private final CatalogManager catalogManager;
 
 	//~ Constructors -----------------------------------------------------------
 
@@ -492,7 +495,7 @@ public class SqlToOperationConverter {
 				builder.field(call.operand(1).toString(),
 					TypeConversions.fromLogicalToDataType(
 						FlinkTypeFactory.toLogicalType(validatedType)),
-					validatedExpr.toString());
+					getQuotedSqlString(validatedExpr));
 				// add computed column into all field list
 				String fieldName = call.operand(1).toString();
 				allFieldNamesToTypes.put(fieldName, validatedType);
@@ -511,12 +514,22 @@ public class SqlToOperationConverter {
 			DataType exprDataType = TypeConversions.fromLogicalToDataType(
 				FlinkTypeFactory.toLogicalType(validatedType));
 			// use the qualified SQL expression string
-			builder.watermark(rowtimeAttribute, validated.toString(), exprDataType);
+			builder.watermark(rowtimeAttribute, getQuotedSqlString(validated), exprDataType);
 		});
 
 		return builder.build();
 	}
 
+	private String getQuotedSqlString(SqlNode sqlNode) {
+		SqlParser.Config parserConfig = flinkPlanner.config().getParserConfig();
+		SqlDialect dialect = new CalciteSqlDialect(SqlDialect.EMPTY_CONTEXT
+			.withQuotedCasing(parserConfig.unquotedCasing())
+			.withConformance(parserConfig.conformance())
+			.withUnquotedCasing(parserConfig.unquotedCasing())
+			.withIdentifierQuoteString(parserConfig.quoting().string));
+		return sqlNode.toSqlString(dialect).getSql();
+	}
+
 	private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
 		// transform to a relational tree
 		RelRoot relational = planner.rel(validated);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 0ad903b..6426436 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -46,7 +46,7 @@ import scala.collection.JavaConverters._
   * The main difference is that we do not create a new RelOptPlanner in the ready() method.
   */
 class FlinkPlannerImpl(
-    config: FrameworkConfig,
+    val config: FrameworkConfig,
     catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
     typeFactory: FlinkTypeFactory,
     cluster: RelOptCluster) {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index bce11d9..693a171 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -19,26 +19,22 @@ limitations under the License.
   <TestCase name="testDataStreamScan">
     <Resource name="sql">
       <![CDATA[SELECT * FROM DataStreamTable]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
 +- LogicalTableScan(table=[[default_catalog, default_database, DataStreamTable]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 DataStreamScan(table=[[default_catalog, default_database, DataStreamTable]], fields=[a, b, c])
 ]]>
-
     </Resource>
   </TestCase>
   <TestCase name="testDDLTableScan">
     <Resource name="sql">
       <![CDATA[SELECT * FROM src WHERE a > 1]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
@@ -47,7 +43,6 @@ LogicalProject(ts=[$0], a=[$1], b=[$2])
    +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($0, 1:INTERVAL SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
@@ -55,52 +50,44 @@ Calc(select=[ts, a, b], where=[>(a, 1)])
 +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1:INTERVAL SECOND)])
    +- TableSourceScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]], fields=[ts, a, b])
 ]]>
-
     </Resource>
   </TestCase>
   <TestCase name="testTableSourceScan">
     <Resource name="sql">
       <![CDATA[SELECT * FROM MyTable]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
-
     </Resource>
   </TestCase>
   <TestCase name="testDDLWithComputedColumn">
     <Resource name="sql">
       <![CDATA[SELECT * FROM t1]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
 +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
 +- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
     </Resource>
   </TestCase>
   <TestCase name="testDDLWithWatermarkComputedColumn">
     <Resource name="sql">
       <![CDATA[SELECT * FROM t1]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
@@ -109,7 +96,6 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
    +- LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
       +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
@@ -117,7 +103,26 @@ WatermarkAssigner(rowtime=[d], watermark=[-(d, 1:INTERVAL SECOND)])
 +- Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
    +- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
+    </Resource>
+  </TestCase>
+  <TestCase name="testKeywordsWithWatermarkComputedColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], time=[$2], mytime=[$3], current_time=[$4], json_row=[$5], timestamp=[$6])
++- LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[$6])
+   +- LogicalProject(a=[$0], b=[$1], time=[$2], mytime=[$2], current_time=[CURRENT_TIME], json_row=[$3], timestamp=[$3.timestamp])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b, time, json_row)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+WatermarkAssigner(rowtime=[timestamp], watermark=[timestamp])
++- Calc(select=[a, b, time, time AS mytime, CURRENT_TIME() AS current_time, json_row, json_row.timestamp AS timestamp])
+   +- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b, time, json_row)]]], fields=[a, b, time, json_row])
+]]>
     </Resource>
   </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 6b16dfc..9e2d628 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -408,11 +408,11 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
       toRow(2, "2019-09-10 9:23:44")
     )
     val expected = List(
-      toRow(1, "1990-02-10 12:34:56", 1),
-      toRow(2, "2019-09-10 9:23:41", 2),
-      toRow(3, "2019-09-10 9:23:42", 3),
-      toRow(1, "2019-09-10 9:23:43", 1),
-      toRow(2, "2019-09-10 9:23:44", 2)
+      toRow(1, "1990-02-10 12:34:56", 1, "1990-02-10 12:34:56"),
+      toRow(2, "2019-09-10 9:23:41", 2, "2019-09-10 9:23:41"),
+      toRow(3, "2019-09-10 9:23:42", 3, "2019-09-10 9:23:42"),
+      toRow(1, "2019-09-10 9:23:43", 1, "2019-09-10 9:23:43"),
+      toRow(2, "2019-09-10 9:23:44", 2, "2019-09-10 9:23:44")
     )
     TestCollectionTableFactory.initData(sourceData)
     tableEnv.registerFunction("my_udf", Func0)
@@ -421,7 +421,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
         |create table t1(
         |  a int,
         |  `time` varchar,
-        |  c as my_udf(a)
+        |  c as my_udf(a),
+        |  d as `time`
         |) with (
         |  'connector' = 'COLLECTION'
         |)
@@ -431,7 +432,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
         |create table t2(
         |  a int,
         |  `time` varchar,
-        |  c int not null
+        |  c int not null,
+        |  d varchar
         |) with (
         |  'connector' = 'COLLECTION'
         |)
@@ -439,7 +441,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
     val query =
       """
         |insert into t2
-        |select t1.a, t1.`time`, t1.c from t1
+        |select t1.a, t1.`time`, t1.c, t1.d from t1
       """.stripMargin
     tableEnv.sqlUpdate(sourceDDL)
     tableEnv.sqlUpdate(sinkDDL)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 017cc86..edc9c3b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -97,4 +97,27 @@ class TableScanTest extends TableTestBase {
        """.stripMargin)
     util.verifyPlan("SELECT * FROM t1")
   }
+
+  @Test
+  def testKeywordsWithWatermarkComputedColumn(): Unit = {
+    // Create table with field as atom expression.
+    util.tableEnv.registerFunction("my_udf", Func0)
+    util.addTable(
+      s"""
+         |create table t1(
+         |  a int,
+         |  b varchar,
+         |  `time` time,
+         |  mytime as `time`,
+         |  `current_time` as current_time,
+         |  json_row ROW<`timestamp` TIMESTAMP(3)>,
+         |  `timestamp` AS json_row.`timestamp`,
+         |  WATERMARK FOR `timestamp` AS `timestamp`
+         |) with (
+         |  'connector' = 'COLLECTION',
+         |  'is-bounded' = 'false'
+         |)
+       """.stripMargin)
+    util.verifyPlan("SELECT * FROM t1")
+  }
 }