You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/12 06:38:25 UTC
[flink] branch release-1.10 updated:
[FLINK-16526][table-planner-blink] Fix exception when computed column
expression references a keyword column name
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 884edd6 [FLINK-16526][table-planner-blink] Fix exception when computed column expression references a keyword column name
884edd6 is described below
commit 884edd6dec549450ac44eb80d83de85eb50dc11b
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Mar 12 14:34:55 2020 +0800
[FLINK-16526][table-planner-blink] Fix exception when computed column expression references a keyword column name
This is a followup fix of FLINK-16018.
This closes #11380
---
.../operations/SqlToOperationConverter.java | 21 ++++++++++---
.../table/planner/calcite/FlinkPlannerImpl.scala | 2 +-
.../planner/plan/stream/sql/TableScanTest.xml | 35 ++++++++++++----------
.../table/planner/catalog/CatalogTableITCase.scala | 18 ++++++-----
.../planner/plan/stream/sql/TableScanTest.scala | 23 ++++++++++++++
5 files changed, 71 insertions(+), 28 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 237c2b7..9899836 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -74,10 +74,13 @@ import org.apache.flink.util.StringUtils;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.dialect.CalciteSqlDialect;
+import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidator;
import java.util.HashMap;
@@ -98,8 +101,8 @@ import java.util.stream.Collectors;
* {@link org.apache.flink.table.delegation.Planner}.
*/
public class SqlToOperationConverter {
- private FlinkPlannerImpl flinkPlanner;
- private CatalogManager catalogManager;
+ private final FlinkPlannerImpl flinkPlanner;
+ private final CatalogManager catalogManager;
//~ Constructors -----------------------------------------------------------
@@ -492,7 +495,7 @@ public class SqlToOperationConverter {
builder.field(call.operand(1).toString(),
TypeConversions.fromLogicalToDataType(
FlinkTypeFactory.toLogicalType(validatedType)),
- validatedExpr.toString());
+ getQuotedSqlString(validatedExpr));
// add computed column into all field list
String fieldName = call.operand(1).toString();
allFieldNamesToTypes.put(fieldName, validatedType);
@@ -511,12 +514,22 @@ public class SqlToOperationConverter {
DataType exprDataType = TypeConversions.fromLogicalToDataType(
FlinkTypeFactory.toLogicalType(validatedType));
// use the qualified SQL expression string
- builder.watermark(rowtimeAttribute, validated.toString(), exprDataType);
+ builder.watermark(rowtimeAttribute, getQuotedSqlString(validated), exprDataType);
});
return builder.build();
}
+ private String getQuotedSqlString(SqlNode sqlNode) {
+ SqlParser.Config parserConfig = flinkPlanner.config().getParserConfig();
+ SqlDialect dialect = new CalciteSqlDialect(SqlDialect.EMPTY_CONTEXT
+ .withQuotedCasing(parserConfig.unquotedCasing())
+ .withConformance(parserConfig.conformance())
+ .withUnquotedCasing(parserConfig.unquotedCasing())
+ .withIdentifierQuoteString(parserConfig.quoting().string));
+ return sqlNode.toSqlString(dialect).getSql();
+ }
+
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
// transform to a relational tree
RelRoot relational = planner.rel(validated);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 0ad903b..6426436 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -46,7 +46,7 @@ import scala.collection.JavaConverters._
* The main difference is that we do not create a new RelOptPlanner in the ready() method.
*/
class FlinkPlannerImpl(
- config: FrameworkConfig,
+ val config: FrameworkConfig,
catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
typeFactory: FlinkTypeFactory,
cluster: RelOptCluster) {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index bce11d9..693a171 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -19,26 +19,22 @@ limitations under the License.
<TestCase name="testDataStreamScan">
<Resource name="sql">
<![CDATA[SELECT * FROM DataStreamTable]]>
-
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, DataStreamTable]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
DataStreamScan(table=[[default_catalog, default_database, DataStreamTable]], fields=[a, b, c])
]]>
-
</Resource>
</TestCase>
<TestCase name="testDDLTableScan">
<Resource name="sql">
<![CDATA[SELECT * FROM src WHERE a > 1]]>
-
</Resource>
<Resource name="planBefore">
<![CDATA[
@@ -47,7 +43,6 @@ LogicalProject(ts=[$0], a=[$1], b=[$2])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($0, 1:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
@@ -55,52 +50,44 @@ Calc(select=[ts, a, b], where=[>(a, 1)])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]], fields=[ts, a, b])
]]>
-
</Resource>
</TestCase>
<TestCase name="testTableSourceScan">
<Resource name="sql">
<![CDATA[SELECT * FROM MyTable]]>
-
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
-
</Resource>
</TestCase>
<TestCase name="testDDLWithComputedColumn">
<Resource name="sql">
<![CDATA[SELECT * FROM t1]]>
-
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
+- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
+- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
]]>
-
</Resource>
</TestCase>
<TestCase name="testDDLWithWatermarkComputedColumn">
<Resource name="sql">
<![CDATA[SELECT * FROM t1]]>
-
</Resource>
<Resource name="planBefore">
<![CDATA[
@@ -109,7 +96,6 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
+- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
@@ -117,7 +103,26 @@ WatermarkAssigner(rowtime=[d], watermark=[-(d, 1:INTERVAL SECOND)])
+- Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
+- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]], fields=[a, b])
]]>
-
+ </Resource>
+ </TestCase>
+ <TestCase name="testKeywordsWithWatermarkComputedColumn">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], time=[$2], mytime=[$3], current_time=[$4], json_row=[$5], timestamp=[$6])
++- LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[$6])
+ +- LogicalProject(a=[$0], b=[$1], time=[$2], mytime=[$2], current_time=[CURRENT_TIME], json_row=[$3], timestamp=[$3.timestamp])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b, time, json_row)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+WatermarkAssigner(rowtime=[timestamp], watermark=[timestamp])
++- Calc(select=[a, b, time, time AS mytime, CURRENT_TIME() AS current_time, json_row, json_row.timestamp AS timestamp])
+ +- TableSourceScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b, time, json_row)]]], fields=[a, b, time, json_row])
+]]>
</Resource>
</TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 6b16dfc..9e2d628 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -408,11 +408,11 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
toRow(2, "2019-09-10 9:23:44")
)
val expected = List(
- toRow(1, "1990-02-10 12:34:56", 1),
- toRow(2, "2019-09-10 9:23:41", 2),
- toRow(3, "2019-09-10 9:23:42", 3),
- toRow(1, "2019-09-10 9:23:43", 1),
- toRow(2, "2019-09-10 9:23:44", 2)
+ toRow(1, "1990-02-10 12:34:56", 1, "1990-02-10 12:34:56"),
+ toRow(2, "2019-09-10 9:23:41", 2, "2019-09-10 9:23:41"),
+ toRow(3, "2019-09-10 9:23:42", 3, "2019-09-10 9:23:42"),
+ toRow(1, "2019-09-10 9:23:43", 1, "2019-09-10 9:23:43"),
+ toRow(2, "2019-09-10 9:23:44", 2, "2019-09-10 9:23:44")
)
TestCollectionTableFactory.initData(sourceData)
tableEnv.registerFunction("my_udf", Func0)
@@ -421,7 +421,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
|create table t1(
| a int,
| `time` varchar,
- | c as my_udf(a)
+ | c as my_udf(a),
+ | d as `time`
|) with (
| 'connector' = 'COLLECTION'
|)
@@ -431,7 +432,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
|create table t2(
| a int,
| `time` varchar,
- | c int not null
+ | c int not null,
+ | d varchar
|) with (
| 'connector' = 'COLLECTION'
|)
@@ -439,7 +441,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
val query =
"""
|insert into t2
- |select t1.a, t1.`time`, t1.c from t1
+ |select t1.a, t1.`time`, t1.c, t1.d from t1
""".stripMargin
tableEnv.sqlUpdate(sourceDDL)
tableEnv.sqlUpdate(sinkDDL)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 017cc86..edc9c3b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -97,4 +97,27 @@ class TableScanTest extends TableTestBase {
""".stripMargin)
util.verifyPlan("SELECT * FROM t1")
}
+
+ @Test
+ def testKeywordsWithWatermarkComputedColumn(): Unit = {
+ // Create table with field as atom expression.
+ util.tableEnv.registerFunction("my_udf", Func0)
+ util.addTable(
+ s"""
+ |create table t1(
+ | a int,
+ | b varchar,
+ | `time` time,
+ | mytime as `time`,
+ | `current_time` as current_time,
+ | json_row ROW<`timestamp` TIMESTAMP(3)>,
+ | `timestamp` AS json_row.`timestamp`,
+ | WATERMARK FOR `timestamp` AS `timestamp`
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ util.verifyPlan("SELECT * FROM t1")
+ }
}