You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/06/06 06:18:22 UTC

[flink] branch release-1.15 updated: [FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e0af037d991 [FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
e0af037d991 is described below

commit e0af037d9910b6cfd4cc3fd8937289f939bb6d9b
Author: yangxin <ya...@pingcap.com>
AuthorDate: Tue May 31 10:42:49 2022 +0800

    [FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
    
    This closes #19847
    
    (cherry picked from commit 9bcc7fd20420bbf90f4b67d98c521a8ddf775d4e)
---
 .../planner/calcite/PreValidateReWriter.scala      |  9 +++++---
 .../planner/plan/stream/sql/TableSinkTest.xml      | 16 +++++++++++++
 .../planner/plan/stream/sql/TableSinkTest.scala    | 17 ++++++++++++++
 .../runtime/stream/table/TableSinkITCase.scala     | 27 ++++++++++++++++++++++
 4 files changed, 66 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index e85ce719c76..86e922b568d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -30,8 +30,8 @@ import org.apache.calcite.plan.RelOptTable
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField}
 import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, SqlUtil}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.util.SqlBasicVisitor
@@ -119,7 +119,10 @@ object PreValidateReWriter {
       source: SqlCall,
       partitions: SqlNodeList): SqlCall = {
     val calciteCatalogReader = validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
-    val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
+    val names = sqlInsert.getTargetTable match {
+      case si: SqlIdentifier => si.names
+      case st: SqlTableRef => st.getOperandList.get(0).asInstanceOf[SqlIdentifier].names
+    }
     val table = calciteCatalogReader.getTable(names)
     if (table == null) {
       // There is no table exists in current catalog,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 2b8b78770c8..4ae3d066174 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -779,4 +779,20 @@ Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
 ]]>
     </Resource>
   </TestCase>
+	<TestCase name="testInsertWithTargetColumnsAndSqlHint">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]])
++- LogicalProject(EXPR$0=[+($0, $1)], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+		</Resource>
+		<Resource name="optimized rel plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]], changelogMode=[NONE])
++- Calc(select=[+(a, b) AS EXPR$0, c], changelogMode=[I])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], changelogMode=[I])
+]]>
+		</Resource>
+	</TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index a4f43257447..69621dc9b42 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -57,6 +57,23 @@ class TableSinkTest extends TableTestBase {
       |)
       |""".stripMargin)
 
+  @Test
+  def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE appendSink (
+                     |  `a` BIGINT,
+                     |  `b` STRING
+                     |) WITH (
+                     |  'connector' = 'values',
+                     |  'sink-insert-only' = 'false'
+                     |)
+                     |""".stripMargin)
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(a, b) SELECT a + b, c FROM MyTable")
+    util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+  }
+
   @Test
   def testInsertMismatchTypeForEmptyChar(): Unit = {
     util.addTable(s"""
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 0218edcd5f2..f7e71141fc3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -86,6 +86,33 @@ class TableSinkITCase extends StreamingTestBase {
     assertEquals(expected.sorted, result.sorted)
   }
 
+  @Test
+  def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+    val t = env
+      .fromCollection(smallTupleData3)
+      .toTable(tEnv, 'id, 'num, 'text)
+    tEnv.createTemporaryView("src", t)
+
+    tEnv.executeSql(s"""
+                       |CREATE TABLE appendSink (
+                       |  `t` INT,
+                       |  `num` BIGINT,
+                       |  `text` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'sink-insert-only' = 'true'
+                       |)
+                       |""".stripMargin)
+    tEnv
+      .executeSql(
+        "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(t, num, text) SELECT id, num, text FROM src")
+      .await()
+
+    val result = TestValuesTableFactory.getResults("appendSink")
+    val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, result.sorted)
+  }
+
   @Test
   def testAppendSinkWithNestedRow(): Unit = {
     val t = env