You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:17 UTC

[hudi] 10/10: [HUDI-4925] Should Force to use ExpressionPayload in MergeIntoTableCommand (#6355)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c48534d4bb1b74b8049e5da8ccd623e9445fb0af
Author: 冯健 <fe...@gmail.com>
AuthorDate: Fri Sep 30 06:34:00 2022 +0800

    [HUDI-4925] Should Force to use ExpressionPayload in MergeIntoTableCommand (#6355)
    
    
    Co-authored-by: jian.feng <ji...@shopee.com>
---
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  6 ++--
 .../spark/sql/hudi/TestMergeIntoTable2.scala       | 40 +++++++++++++++++++++-
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 2761a00205..f0394ad379 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -509,7 +509,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
     val targetTableDb = targetTableIdentify.database.getOrElse("default")
     val targetTableName = targetTableIdentify.identifier
     val path = hoodieCatalogTable.tableLocation
-    val catalogProperties = hoodieCatalogTable.catalogProperties
+    // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand
+    val catalogProperties = hoodieCatalogTable.catalogProperties + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName)
     val tableConfig = hoodieCatalogTable.tableConfig
     val tableSchema = hoodieCatalogTable.tableSchema
     val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
@@ -523,14 +524,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
     val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
     val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
 
-    withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
+    withSparkConf(sparkSession, catalogProperties) {
       Map(
         "path" -> path,
         RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
         PRECOMBINE_FIELD.key -> preCombineField,
         TBL_NAME.key -> hoodieCatalogTable.tableName,
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
         HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index 8e6acd1be5..8a6aa9691d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -674,7 +674,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
     }
   }
 
-  test ("Test Merge into with String cast to Double") {
+  test("Test Merge into with String cast to Double") {
     withTempDir { tmp =>
       val tableName = generateTableName
       // Create a cow partitioned table.
@@ -713,4 +713,42 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
       )
     }
   }
+
+  test("Test Merge into where manually set DefaultHoodieRecordPayload") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a cow table with default payload class, check whether it will be overwritten by ExpressionPayload.
+      // if not, this ut cannot pass since DefaultHoodieRecordPayload can not promotion int to long when insert a ts with Integer value
+      spark.sql(
+        s"""
+           | create table $tableName (
+           |  id int,
+           |  name string,
+           |  ts long
+           | ) using hudi
+           | tblproperties (
+           |  type = 'cow',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
+           | ) location '${tmp.getCanonicalPath}'
+         """.stripMargin)
+      // Insert data
+      spark.sql(s"insert into $tableName select 1, 'a1', 999")
+      spark.sql(
+        s"""
+           | merge into $tableName as t0
+           | using (
+           |  select 'a2' as name, 1 as id, 1000 as ts
+           | ) as s0
+           | on t0.id = s0.id
+           | when matched then update set t0.name = s0.name, t0.ts = s0.ts
+           | when not matched then insert *
+         """.stripMargin
+      )
+      checkAnswer(s"select id,name,ts from $tableName")(
+        Seq(1, "a2", 1000)
+      )
+    }
+  }
 }