You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/06/06 06:18:22 UTC
[flink] branch release-1.15 updated: [FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new e0af037d991 [FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
e0af037d991 is described below
commit e0af037d9910b6cfd4cc3fd8937289f939bb6d9b
Author: yangxin <ya...@pingcap.com>
AuthorDate: Tue May 31 10:42:49 2022 +0800
[FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
This closes #19847
(cherry picked from commit 9bcc7fd20420bbf90f4b67d98c521a8ddf775d4e)
---
.../planner/calcite/PreValidateReWriter.scala | 9 +++++---
.../planner/plan/stream/sql/TableSinkTest.xml | 16 +++++++++++++
.../planner/plan/stream/sql/TableSinkTest.scala | 17 ++++++++++++++
.../runtime/stream/table/TableSinkITCase.scala | 27 ++++++++++++++++++++++
4 files changed, 66 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index e85ce719c76..86e922b568d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -30,8 +30,8 @@ import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField}
import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, SqlUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
@@ -119,7 +119,10 @@ object PreValidateReWriter {
source: SqlCall,
partitions: SqlNodeList): SqlCall = {
val calciteCatalogReader = validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
- val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
+ val names = sqlInsert.getTargetTable match {
+ case si: SqlIdentifier => si.names
+ case st: SqlTableRef => st.getOperandList.get(0).asInstanceOf[SqlIdentifier].names
+ }
val table = calciteCatalogReader.getTable(names)
if (table == null) {
// There is no table exists in current catalog,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 2b8b78770c8..4ae3d066174 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -779,4 +779,20 @@ Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
]]>
</Resource>
</TestCase>
+ <TestCase name="testInsertWithTargetColumnsAndSqlHint">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]])
++- LogicalProject(EXPR$0=[+($0, $1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]], changelogMode=[NONE])
++- Calc(select=[+(a, b) AS EXPR$0, c], changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index a4f43257447..69621dc9b42 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -57,6 +57,23 @@ class TableSinkTest extends TableTestBase {
|)
|""".stripMargin)
+ @Test
+ def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+ util.addTable(s"""
+ |CREATE TABLE appendSink (
+ | `a` BIGINT,
+ | `b` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+ val stmtSet = util.tableEnv.createStatementSet()
+ stmtSet.addInsertSql(
+ "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(a, b) SELECT a + b, c FROM MyTable")
+ util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+ }
+
@Test
def testInsertMismatchTypeForEmptyChar(): Unit = {
util.addTable(s"""
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 0218edcd5f2..f7e71141fc3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -86,6 +86,33 @@ class TableSinkITCase extends StreamingTestBase {
assertEquals(expected.sorted, result.sorted)
}
+ @Test
+ def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+ val t = env
+ .fromCollection(smallTupleData3)
+ .toTable(tEnv, 'id, 'num, 'text)
+ tEnv.createTemporaryView("src", t)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE appendSink (
+ | `t` INT,
+ | `num` BIGINT,
+ | `text` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |)
+ |""".stripMargin)
+ tEnv
+ .executeSql(
+ "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(t, num, text) SELECT id, num, text FROM src")
+ .await()
+
+ val result = TestValuesTableFactory.getResults("appendSink")
+ val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ assertEquals(expected.sorted, result.sorted)
+ }
+
@Test
def testAppendSinkWithNestedRow(): Unit = {
val t = env