You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2023/02/01 05:54:43 UTC

[hudi] branch master updated: [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a9a6d20471 [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)
0a9a6d20471 is described below

commit 0a9a6d2047152f74152907effc3db3a66435fa16
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Wed Feb 1 13:54:37 2023 +0800

    [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)
---
 .../command/InsertIntoHoodieTableCommand.scala     |   4 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 120 ++-------------------
 2 files changed, 12 insertions(+), 112 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 2e4c1db099e..f07611ad019 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -90,8 +90,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
     var mode = SaveMode.Append
     var isOverWriteTable = false
     var isOverWritePartition = false
-    if (overwrite && catalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+    if (overwrite && partitionSpec.isEmpty) {
+      // insert overwrite table
       mode = SaveMode.Overwrite
       isOverWriteTable = true
     } else {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index a227a20b8b6..b092a68e20d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -369,7 +369,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
            | partitioned by (dt)
            | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
-      //  Insert overwrite dynamic partition
+
+      //  Insert overwrite table
       spark.sql(
         s"""
            | insert overwrite table $tableName
@@ -379,14 +380,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq(1, "a1", 10.0, 1000, "2021-01-05")
       )
 
-      //  Insert overwrite dynamic partition
+      //  Insert overwrite table
       spark.sql(
         s"""
            | insert overwrite table $tableName
            | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-01-06' as dt
         """.stripMargin)
       checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
         Seq(2, "a2", 10.0, 1000, "2021-01-06")
       )
 
@@ -433,122 +433,22 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         """.stripMargin)
       checkAnswer(s"select id, name, price, ts, dt from $tableName " +
         s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
-        Seq(2, "a2", 12.0, 1000, "2021-01-05"),
-        Seq(2, "a2", 10.0, 1000, "2021-01-06"),
         Seq(3, "a1", 10.0, 1000, "2021-01-04")
       )
 
-      // test insert overwrite non-partitioned table
+      // Test insert overwrite non-partitioned table
       spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10, 1000")
       checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
         Seq(2, "a2", 10.0, 1000)
       )
-    })
-  }
 
-  test("Test Insert Overwrite Table for V2 Table") {
-    withSQLConf("hoodie.schema.on.read.enable" -> "true") {
-      withRecordType()(withTempDir { tmp =>
-        if (HoodieSparkUtils.gteqSpark3_2) {
-          val tableName = generateTableName
-          // Create a partitioned table
-          spark.sql(
-            s"""
-               |create table $tableName (
-               |  id int,
-               |  name string,
-               |  price double,
-               |  ts long,
-               |  dt string
-               |) using hudi
-               | tblproperties (primaryKey = 'id', preCombineField='dt')
-               | partitioned by (dt)
-               | location '${tmp.getCanonicalPath}/$tableName'
-        """.stripMargin)
-
-          //  Test insert overwrite table
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName
-               | values(1, 'a1', 10.0, 1000, '2021-01-05')
-         """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-            Seq(1, "a1", 10.0, 1000, "2021-01-05")
-          )
-
-          //  Insert overwrite table
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName
-               | values (2, 'a2', 10.0, 1000, '2021-01-06')
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
-            Seq(2, "a2", 10.0, 1000, "2021-01-06")
-          )
-
-          // Insert overwrite static partition
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName partition(dt = '2021-01-05')
-               | select * from (select 2 , 'a2', 12.0, 1000) limit 10
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
-            Seq(2, "a2", 12.0, 1000, "2021-01-05"),
-            Seq(2, "a2", 10.0, 1000, "2021-01-06")
-          )
-
-          // Insert data from another table
-          val tblNonPartition = generateTableName
-          spark.sql(
-            s"""
-               | create table $tblNonPartition (
-               |  id int,
-               |  name string,
-               |  price double,
-               |  ts long
-               | ) using hudi
-               | tblproperties (primaryKey = 'id')
-               | location '${tmp.getCanonicalPath}/$tblNonPartition'
-                  """.stripMargin)
-          spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName partition(dt ='2021-01-04')
-               | select * from $tblNonPartition limit 10
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")(
-            Seq(1, "a1", 10.0, 1000, "2021-01-04"),
-            Seq(2, "a2", 12.0, 1000, "2021-01-05"),
-            Seq(2, "a2", 10.0, 1000, "2021-01-06")
-          )
-
-          // Insert overwrite partitioned table, all partitions will be truncated
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName
-               | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName " +
-            s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
-            Seq(3, "a1", 10.0, 1000, "2021-01-04")
-          )
-
-          // Test insert overwrite non-partitioned table
-          spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000")
-          checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
-            Seq(2, "a2", 10.0, 1000)
-          )
-
-          spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000")
-          checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
-            Seq(2, "a2", 10.0, 2000)
-          )
-        }
-      })
-    }
+      spark.sql(s"insert overwrite table $tblNonPartition select 3, 'a3', 10, 1000")
+      checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+        Seq(3, "a3", 10.0, 1000)
+      )
+    })
   }
 
-
   test("Test Different Type of Partition Column") {
    withRecordType()(withTempDir { tmp =>
      val typeAndValue = Seq(
@@ -666,7 +566,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
              | tblproperties (primaryKey = 'id')
              | partitioned by (dt)
        """.stripMargin)
-        checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")(
+        checkException(s"insert overwrite table $tableName3 partition(dt = '2021-07-18') values(1, 'a1', 10, '2021-07-18')")(
           "Insert Overwrite Partition can not use bulk insert."
         )
       }