You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2023/01/12 08:29:23 UTC
[hudi] branch master updated: [HUDI-5317] Fix insert overwrite table for partitioned table (#7365)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d65668611e3 [HUDI-5317] Fix insert overwrite table for partitioned table (#7365)
d65668611e3 is described below
commit d65668611e3e0f514d2b20fc3cf0f360146f532e
Author: StreamingFlames <18...@163.com>
AuthorDate: Thu Jan 12 16:29:15 2023 +0800
[HUDI-5317] Fix insert overwrite table for partitioned table (#7365)
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 21 +++--
.../command/InsertIntoHoodieTableCommand.scala | 18 ++--
.../apache/spark/sql/hudi/TestInsertTable.scala | 104 +++++++++++++++++++++
.../spark/sql/hudi/catalog/HoodieCatalog.scala | 4 +-
.../sql/hudi/catalog/HoodieInternalV2Table.scala | 18 ++--
5 files changed, 135 insertions(+), 30 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index fcba6e310dc..db2b93eda08 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -95,7 +95,8 @@ trait ProvidesHoodieConfig extends Logging {
*/
def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
- isOverwrite: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {
@@ -139,24 +140,24 @@ trait ProvidesHoodieConfig extends Logging {
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty
val operation =
- (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
- case (true, _, _, false, _) =>
+ (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
+ case (true, _, _, _, false, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
- case (true, true, _, _, true) =>
+ case (true, true, _, _, _, true) =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
- case (true, _, true, _, _) =>
+ case (true, _, _, true, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
- case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
+ case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
- case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ case (false, false, true, _, _, _) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
- case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
+ case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
- case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
+ case (false, false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
- case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+ case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case _ => INSERT_OPERATION_OPT_VAL
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 0228e5ddcf7..2e4c1db099e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -86,16 +86,22 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
val catalogTable = new HoodieCatalogTable(sparkSession, table)
- val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions)
- // NOTE: In case of partitioned table we override specified "overwrite" parameter
- // to instead append to the dataset
- val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
- SaveMode.Overwrite
+ var mode = SaveMode.Append
+ var isOverWriteTable = false
+ var isOverWritePartition = false
+ if (overwrite && catalogTable.partitionFields.isEmpty) {
+ // insert overwrite non-partition table
+ mode = SaveMode.Overwrite
+ isOverWriteTable = true
} else {
- SaveMode.Append
+ // for insert into or insert overwrite partition we use append mode.
+ mode = SaveMode.Append
+ isOverWritePartition = overwrite
}
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
+
val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf)
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 73aaabc0d8f..b6444b52b52 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
@@ -443,6 +444,109 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
+ test("Test Insert Overwrite Table for V2 Table") {
+ withSQLConf("hoodie.schema.on.read.enable" -> "true") {
+ withRecordType()(withTempDir { tmp =>
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (primaryKey = 'id', preCombineField='dt')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ """.stripMargin)
+
+ // Test insert overwrite table
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName
+ | values(1, 'a1', 10.0, 1000, '2021-01-05')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+
+ // Insert overwrite table
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName
+ | values (2, 'a2', 10.0, 1000, '2021-01-06')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
+ Seq(2, "a2", 10.0, 1000, "2021-01-06")
+ )
+
+ // Insert overwrite static partition
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName partition(dt = '2021-01-05')
+ | select * from (select 2 , 'a2', 12.0, 1000) limit 10
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
+ Seq(2, "a2", 12.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 10.0, 1000, "2021-01-06")
+ )
+
+ // Insert data from another table
+ val tblNonPartition = generateTableName
+ spark.sql(
+ s"""
+ | create table $tblNonPartition (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | tblproperties (primaryKey = 'id')
+ | location '${tmp.getCanonicalPath}/$tblNonPartition'
+ """.stripMargin)
+ spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName partition(dt ='2021-01-04')
+ | select * from $tblNonPartition limit 10
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-04"),
+ Seq(2, "a2", 12.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 10.0, 1000, "2021-01-06")
+ )
+
+ // Insert overwrite partitioned table, all partitions will be truncated
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName
+ | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName " +
+ s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
+ Seq(3, "a1", 10.0, 1000, "2021-01-04")
+ )
+
+ // Test insert overwrite non-partitioned table
+ spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000")
+ checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+ Seq(2, "a2", 10.0, 1000)
+ )
+
+ spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000")
+ checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+ Seq(2, "a2", 10.0, 2000)
+ )
+ }
+ })
+ }
+ }
+
+
test("Test Different Type of Partition Column") {
withRecordType()(withTempDir { tmp =>
val typeAndValue = Seq(
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
index eeef56d3cff..6d3610db21e 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -293,13 +293,13 @@ class HoodieCatalog extends DelegatingCatalogExtension
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
- saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, options))
+ saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition =false, isOverwriteTable = false, Map.empty, options))
CreateHoodieTableCommand.createTableInCatalog(spark, hoodieCatalogTable, ignoreIfExists = false)
} else if (sourceQuery.isEmpty) {
saveSourceDF(sourceQuery, tableDesc.properties)
new CreateHoodieTableCommand(tableDesc, false).run(spark)
} else {
- saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, Map.empty))
+ saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition = false, isOverwriteTable = false, Map.empty, Map.empty))
new CreateHoodieTableCommand(tableDesc, false).run(spark)
}
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
index 9968095f3a5..b41c7456b71 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
@@ -89,15 +89,16 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
spark: SparkSession)
extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {
- private var forceOverwrite = false
+ private var overwriteTable = false
+ private var overwritePartition = false
override def truncate(): HoodieV1WriteBuilder = {
- forceOverwrite = true
+ overwriteTable = true
this
}
override def overwrite(filters: Array[Filter]): WriteBuilder = {
- forceOverwrite = true
+ overwritePartition = true
this
}
@@ -105,17 +106,10 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
override def toInsertableRelation: InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
- val mode = if (forceOverwrite && hoodieCatalogTable.partitionFields.isEmpty) {
- // insert overwrite non-partition table
- SaveMode.Overwrite
- } else {
- // for insert into or insert overwrite partition we use append mode.
- SaveMode.Append
- }
alignOutputColumns(data).write.format("org.apache.hudi")
- .mode(mode)
+ .mode(SaveMode.Append)
.options(buildHoodieConfig(hoodieCatalogTable) ++
- buildHoodieInsertConfig(hoodieCatalogTable, spark, forceOverwrite, Map.empty, Map.empty))
+ buildHoodieInsertConfig(hoodieCatalogTable, spark, overwritePartition, overwriteTable, Map.empty, Map.empty))
.save()
}
}