You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:17 UTC
[hudi] 10/10: [HUDI-4925] Should Force to use ExpressionPayload in MergeIntoTableCommand (#6355)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c48534d4bb1b74b8049e5da8ccd623e9445fb0af
Author: 冯健 <fe...@gmail.com>
AuthorDate: Fri Sep 30 06:34:00 2022 +0800
[HUDI-4925] Should Force to use ExpressionPayload in MergeIntoTableCommand (#6355)
Co-authored-by: jian.feng <ji...@shopee.com>
---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 6 ++--
.../spark/sql/hudi/TestMergeIntoTable2.scala | 40 +++++++++++++++++++++-
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 2761a00205..f0394ad379 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -509,7 +509,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = hoodieCatalogTable.tableLocation
- val catalogProperties = hoodieCatalogTable.catalogProperties
+ // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand
+ val catalogProperties = hoodieCatalogTable.catalogProperties + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName)
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
@@ -523,14 +524,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
- withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
+ withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> preCombineField,
TBL_NAME.key -> hoodieCatalogTable.tableName,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
- PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index 8e6acd1be5..8a6aa9691d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -674,7 +674,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
}
- test ("Test Merge into with String cast to Double") {
+ test("Test Merge into with String cast to Double") {
withTempDir { tmp =>
val tableName = generateTableName
// Create a cow partitioned table.
@@ -713,4 +713,42 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
)
}
}
+
+ test("Test Merge into where manually set DefaultHoodieRecordPayload") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a cow table with default payload class, check whether it will be overwritten by ExpressionPayload.
+ // if not, this ut cannot pass since DefaultHoodieRecordPayload can not promotion int to long when insert a ts with Integer value
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | ts long
+ | ) using hudi
+ | tblproperties (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
+ | ) location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+ // Insert data
+ spark.sql(s"insert into $tableName select 1, 'a1', 999")
+ spark.sql(
+ s"""
+ | merge into $tableName as t0
+ | using (
+ | select 'a2' as name, 1 as id, 1000 as ts
+ | ) as s0
+ | on t0.id = s0.id
+ | when matched then update set t0.name = s0.name, t0.ts = s0.ts
+ | when not matched then insert *
+ """.stripMargin
+ )
+ checkAnswer(s"select id,name,ts from $tableName")(
+ Seq(1, "a2", 1000)
+ )
+ }
+ }
}