You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2023/01/12 08:29:23 UTC

[hudi] branch master updated: [HUDI-5317] Fix insert overwrite table for partitioned table (#7365)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d65668611e3 [HUDI-5317] Fix insert overwrite table for partitioned table (#7365)
d65668611e3 is described below

commit d65668611e3e0f514d2b20fc3cf0f360146f532e
Author: StreamingFlames <18...@163.com>
AuthorDate: Thu Jan 12 16:29:15 2023 +0800

    [HUDI-5317] Fix insert overwrite table for partitioned table (#7365)
---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  21 +++--
 .../command/InsertIntoHoodieTableCommand.scala     |  18 ++--
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 104 +++++++++++++++++++++
 .../spark/sql/hudi/catalog/HoodieCatalog.scala     |   4 +-
 .../sql/hudi/catalog/HoodieInternalV2Table.scala   |  18 ++--
 5 files changed, 135 insertions(+), 30 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index fcba6e310dc..db2b93eda08 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -95,7 +95,8 @@ trait ProvidesHoodieConfig extends Logging {
    */
   def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
                               sparkSession: SparkSession,
-                              isOverwrite: Boolean,
+                              isOverwritePartition: Boolean,
+                              isOverwriteTable: Boolean,
                               insertPartitions: Map[String, Option[String]] = Map.empty,
                               extraOptions: Map[String, String]): Map[String, String] = {
 
@@ -139,24 +140,24 @@ trait ProvidesHoodieConfig extends Logging {
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty
     val operation =
-      (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
-        case (true, _, _, false, _) =>
+      (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
+        case (true, _, _, _, false, _) =>
           throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
-        case (true, true, _, _, true) =>
+        case (true, true, _, _, _, true) =>
           throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
-        case (true, _, true, _, _) =>
+        case (true, _, _, true, _, _) =>
           throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
             s" Please disable $INSERT_DROP_DUPS and try again.")
         // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
-        case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
+        case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
         // insert overwrite table
-        case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+        case (false, false, true, _, _, _) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
         // insert overwrite partition
-        case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
+        case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
         // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
-        case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
+        case (false, false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
         // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
-        case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+        case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
         // for the rest case, use the insert operation
         case _ => INSERT_OPERATION_OPT_VAL
       }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 0228e5ddcf7..2e4c1db099e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -86,16 +86,22 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
           refreshTable: Boolean = true,
           extraOptions: Map[String, String] = Map.empty): Boolean = {
     val catalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions)
 
-    // NOTE: In case of partitioned table we override specified "overwrite" parameter
-    //       to instead append to the dataset
-    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
-      SaveMode.Overwrite
+    var mode = SaveMode.Append
+    var isOverWriteTable = false
+    var isOverWritePartition = false
+    if (overwrite && catalogTable.partitionFields.isEmpty) {
+      // insert overwrite non-partition table
+      mode = SaveMode.Overwrite
+      isOverWriteTable = true
     } else {
-      SaveMode.Append
+      // for insert into or insert overwrite partition we use append mode.
+      mode = SaveMode.Append
+      isOverWritePartition = overwrite
     }
 
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
+
     val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf)
 
     val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 73aaabc0d8f..b6444b52b52 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
@@ -443,6 +444,109 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     })
   }
 
+  test("Test Insert Overwrite Table for V2 Table") {
+    withSQLConf("hoodie.schema.on.read.enable" -> "true") {
+      withRecordType()(withTempDir { tmp =>
+        if (HoodieSparkUtils.gteqSpark3_2) {
+          val tableName = generateTableName
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long,
+               |  dt string
+               |) using hudi
+               | tblproperties (primaryKey = 'id', preCombineField='dt')
+               | partitioned by (dt)
+               | location '${tmp.getCanonicalPath}/$tableName'
+        """.stripMargin)
+
+          //  Test insert overwrite table
+          spark.sql(
+            s"""
+               | insert overwrite table $tableName
+               | values(1, 'a1', 10.0, 1000, '2021-01-05')
+         """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05")
+          )
+
+          //  Insert overwrite table
+          spark.sql(
+            s"""
+               | insert overwrite table $tableName
+               | values (2, 'a2', 10.0, 1000, '2021-01-06')
+                 """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
+            Seq(2, "a2", 10.0, 1000, "2021-01-06")
+          )
+
+          // Insert overwrite static partition
+          spark.sql(
+            s"""
+               | insert overwrite table $tableName partition(dt = '2021-01-05')
+               | select * from (select 2 , 'a2', 12.0, 1000) limit 10
+                 """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
+            Seq(2, "a2", 12.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 10.0, 1000, "2021-01-06")
+          )
+
+          // Insert data from another table
+          val tblNonPartition = generateTableName
+          spark.sql(
+            s"""
+               | create table $tblNonPartition (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (primaryKey = 'id')
+               | location '${tmp.getCanonicalPath}/$tblNonPartition'
+                  """.stripMargin)
+          spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
+          spark.sql(
+            s"""
+               | insert overwrite table $tableName partition(dt ='2021-01-04')
+               | select * from $tblNonPartition limit 10
+                 """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-04"),
+            Seq(2, "a2", 12.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 10.0, 1000, "2021-01-06")
+          )
+
+          // Insert overwrite partitioned table, all partitions will be truncated
+          spark.sql(
+            s"""
+               | insert overwrite table $tableName
+               | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10
+                 """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName " +
+            s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
+            Seq(3, "a1", 10.0, 1000, "2021-01-04")
+          )
+
+          // Test insert overwrite non-partitioned table
+          spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000")
+          checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+            Seq(2, "a2", 10.0, 1000)
+          )
+
+          spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000")
+          checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+            Seq(2, "a2", 10.0, 2000)
+          )
+        }
+      })
+    }
+  }
+
+
   test("Test Different Type of Partition Column") {
    withRecordType()(withTempDir { tmp =>
      val typeAndValue = Seq(
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
index eeef56d3cff..6d3610db21e 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -293,13 +293,13 @@ class HoodieCatalog extends DelegatingCatalogExtension
         DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
         DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
       )
-      saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, options))
+      saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition =false, isOverwriteTable = false, Map.empty, options))
       CreateHoodieTableCommand.createTableInCatalog(spark, hoodieCatalogTable, ignoreIfExists = false)
     } else if (sourceQuery.isEmpty) {
       saveSourceDF(sourceQuery, tableDesc.properties)
       new CreateHoodieTableCommand(tableDesc, false).run(spark)
     } else {
-      saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, Map.empty))
+      saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition = false, isOverwriteTable = false, Map.empty, Map.empty))
       new CreateHoodieTableCommand(tableDesc, false).run(spark)
     }
 
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
index 9968095f3a5..b41c7456b71 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
@@ -89,15 +89,16 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
                                      spark: SparkSession)
   extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {
 
-  private var forceOverwrite = false
+  private var overwriteTable = false
+  private var overwritePartition = false
 
   override def truncate(): HoodieV1WriteBuilder = {
-    forceOverwrite = true
+    overwriteTable = true
     this
   }
 
   override def overwrite(filters: Array[Filter]): WriteBuilder = {
-    forceOverwrite = true
+    overwritePartition = true
     this
   }
 
@@ -105,17 +106,10 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
     override def toInsertableRelation: InsertableRelation = {
       new InsertableRelation {
         override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-          val mode = if (forceOverwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-            // insert overwrite non-partition table
-            SaveMode.Overwrite
-          } else {
-            // for insert into or insert overwrite partition we use append mode.
-            SaveMode.Append
-          }
           alignOutputColumns(data).write.format("org.apache.hudi")
-            .mode(mode)
+            .mode(SaveMode.Append)
             .options(buildHoodieConfig(hoodieCatalogTable) ++
-              buildHoodieInsertConfig(hoodieCatalogTable, spark, forceOverwrite, Map.empty, Map.empty))
+              buildHoodieInsertConfig(hoodieCatalogTable, spark, overwritePartition, overwriteTable, Map.empty, Map.empty))
             .save()
         }
       }