You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2023/02/01 05:54:43 UTC
[hudi] branch master updated: [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0a9a6d20471 [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)
0a9a6d20471 is described below
commit 0a9a6d2047152f74152907effc3db3a66435fa16
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Wed Feb 1 13:54:37 2023 +0800
[HUDI-5317] Fix insert overwrite table for partitioned table (#7793)
---
.../command/InsertIntoHoodieTableCommand.scala | 4 +-
.../apache/spark/sql/hudi/TestInsertTable.scala | 120 ++-------------------
2 files changed, 12 insertions(+), 112 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 2e4c1db099e..f07611ad019 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -90,8 +90,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
var mode = SaveMode.Append
var isOverWriteTable = false
var isOverWritePartition = false
- if (overwrite && catalogTable.partitionFields.isEmpty) {
- // insert overwrite non-partition table
+ if (overwrite && partitionSpec.isEmpty) {
+ // insert overwrite table
mode = SaveMode.Overwrite
isOverWriteTable = true
} else {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index a227a20b8b6..b092a68e20d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -369,7 +369,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
- // Insert overwrite dynamic partition
+
+ // Insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
@@ -379,14 +380,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
- // Insert overwrite dynamic partition
+ // Insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
| select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-01-06' as dt
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)
@@ -433,122 +433,22 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName " +
s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
- Seq(2, "a2", 12.0, 1000, "2021-01-05"),
- Seq(2, "a2", 10.0, 1000, "2021-01-06"),
Seq(3, "a1", 10.0, 1000, "2021-01-04")
)
- // test insert overwrite non-partitioned table
+ // Test insert overwrite non-partitioned table
spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10, 1000")
checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
Seq(2, "a2", 10.0, 1000)
)
- })
- }
- test("Test Insert Overwrite Table for V2 Table") {
- withSQLConf("hoodie.schema.on.read.enable" -> "true") {
- withRecordType()(withTempDir { tmp =>
- if (HoodieSparkUtils.gteqSpark3_2) {
- val tableName = generateTableName
- // Create a partitioned table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long,
- | dt string
- |) using hudi
- | tblproperties (primaryKey = 'id', preCombineField='dt')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}/$tableName'
- """.stripMargin)
-
- // Test insert overwrite table
- spark.sql(
- s"""
- | insert overwrite table $tableName
- | values(1, 'a1', 10.0, 1000, '2021-01-05')
- """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
-
- // Insert overwrite table
- spark.sql(
- s"""
- | insert overwrite table $tableName
- | values (2, 'a2', 10.0, 1000, '2021-01-06')
- """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
- Seq(2, "a2", 10.0, 1000, "2021-01-06")
- )
-
- // Insert overwrite static partition
- spark.sql(
- s"""
- | insert overwrite table $tableName partition(dt = '2021-01-05')
- | select * from (select 2 , 'a2', 12.0, 1000) limit 10
- """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
- Seq(2, "a2", 12.0, 1000, "2021-01-05"),
- Seq(2, "a2", 10.0, 1000, "2021-01-06")
- )
-
- // Insert data from another table
- val tblNonPartition = generateTableName
- spark.sql(
- s"""
- | create table $tblNonPartition (
- | id int,
- | name string,
- | price double,
- | ts long
- | ) using hudi
- | tblproperties (primaryKey = 'id')
- | location '${tmp.getCanonicalPath}/$tblNonPartition'
- """.stripMargin)
- spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
- spark.sql(
- s"""
- | insert overwrite table $tableName partition(dt ='2021-01-04')
- | select * from $tblNonPartition limit 10
- """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")(
- Seq(1, "a1", 10.0, 1000, "2021-01-04"),
- Seq(2, "a2", 12.0, 1000, "2021-01-05"),
- Seq(2, "a2", 10.0, 1000, "2021-01-06")
- )
-
- // Insert overwrite partitioned table, all partitions will be truncated
- spark.sql(
- s"""
- | insert overwrite table $tableName
- | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10
- """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName " +
- s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
- Seq(3, "a1", 10.0, 1000, "2021-01-04")
- )
-
- // Test insert overwrite non-partitioned table
- spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000")
- checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
- Seq(2, "a2", 10.0, 1000)
- )
-
- spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000")
- checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
- Seq(2, "a2", 10.0, 2000)
- )
- }
- })
- }
+ spark.sql(s"insert overwrite table $tblNonPartition select 3, 'a3', 10, 1000")
+ checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+ Seq(3, "a3", 10.0, 1000)
+ )
+ })
}
-
test("Test Different Type of Partition Column") {
withRecordType()(withTempDir { tmp =>
val typeAndValue = Seq(
@@ -666,7 +566,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
""".stripMargin)
- checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")(
+ checkException(s"insert overwrite table $tableName3 partition(dt = '2021-07-18') values(1, 'a1', 10, '2021-07-18')")(
"Insert Overwrite Partition can not use bulk insert."
)
}