You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by zh...@apache.org on 2021/08/21 13:53:27 UTC

[hudi] branch master updated: Support referencing subquery with column aliases by table alias in merge into (#3380)

This is an automated email from the ASF dual-hosted git repository.

zhiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be8c1e4  Support referencing subquery with column aliases by table alias in merge into (#3380)
be8c1e4 is described below

commit be8c1e499fc94483c2a26687072942515b793368
Author: 董可伦 <do...@inspur.com>
AuthorDate: Sat Aug 21 21:53:16 2021 +0800

    Support referencing subquery with column aliases by table alias in merge into (#3380)
---
 .../spark/sql/hudi/TestMergeIntoTable2.scala       | 57 +++++++++++++++++++++-
 .../parser/HoodieSpark2ExtendedSqlAstBuilder.scala |  6 +--
 2 files changed, 59 insertions(+), 4 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index 88d2e97..92a2c63 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -175,7 +175,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
   }
 
   test("Test Merge With Complex Data Type") {
-    withTempDir{tmp =>
+    withTempDir { tmp =>
       val tableName = generateTableName
       spark.sql(
         s"""
@@ -320,4 +320,59 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
     }
   }
 
+  test("Test MergeInto For Source Table With Column Aliases") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | options (
+           |  primaryKey ='id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      // Merge with an extra input field 'flag' (insert a new record)
+      val mergeSql =
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1, 'a1', 10, 1000, '1'
+           | ) s0(id,name,price,ts,flag)
+           | on s0.id = $tableName.id
+           | when matched and flag = '1' then update set
+           | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+           | when not matched and flag = '1' then insert *
+           |""".stripMargin
+
+      if (HoodieSqlUtils.isSpark3) {
+        checkException(mergeSql)(
+            "\nColumns aliases are not allowed in MERGE.(line 5, pos 5)\n\n" +
+            "== SQL ==\n\r\n" +
+            s" merge into $tableName\r\n" +
+            " using (\r\n" +
+            "  select 1, 'a1', 10, 1000, '1'\r\n" +
+            " ) s0(id,name,price,ts,flag)\r\n" +
+            "-----^^^\n" +
+            s" on s0.id = $tableName.id\r\n" +
+            " when matched and flag = '1' then update set\r\n" +
+            " id = s0.id, name = s0.name, price = s0.price, ts = s0.ts\r\n" +
+            " when not matched and flag = '1' then insert *\r\n"
+        )
+      } else {
+        spark.sql(mergeSql)
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 10.0, 1000)
+        )
+      }
+    }
+  }
+
 }
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlAstBuilder.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlAstBuilder.scala
index 4e38594..bbc9014 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlAstBuilder.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlAstBuilder.scala
@@ -185,12 +185,12 @@ class HoodieSpark2ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterface
     */
   protected def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = {
     if (tableAlias.strictIdentifier != null) {
-      val subquery = SubqueryAlias(tableAlias.strictIdentifier.getText, plan)
+      val alias = tableAlias.strictIdentifier.getText
       if (tableAlias.identifierList != null) {
         val columnNames = visitIdentifierList(tableAlias.identifierList)
-        UnresolvedSubqueryColumnAliases(columnNames, subquery)
+        SubqueryAlias(alias, UnresolvedSubqueryColumnAliases(columnNames, plan))
       } else {
-        subquery
+        SubqueryAlias(alias, plan)
       }
     } else {
       plan