You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/11 17:07:58 UTC
[hudi] branch master updated: [HUDI-1520] add configure for spark
sql overwrite use INSERT_OVERWRITE_TABLE (#2428)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new de42adc [HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428)
de42adc is described below
commit de42adc2302528541145a714078cc6d10cbb8d9a
Author: lw0090 <lw...@gmail.com>
AuthorDate: Tue Jan 12 01:07:47 2021 +0800
[HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428)
---
.../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 13 ++++++-------
.../org/apache/hudi/functional/TestCOWDataSource.scala | 3 ++-
.../org/apache/hudi/functional/TestMORDataSource.scala | 1 -
3 files changed, 8 insertions(+), 9 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 472f450..4e9caa5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -94,13 +94,6 @@ private[hudi] object HoodieSparkSqlWriter {
operation = WriteOperationType.INSERT
}
- // If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
- // Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite
- // the table. This will replace the old fs.delete(tablepath) mode.
- if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
- operation = WriteOperationType.INSERT_OVERWRITE_TABLE
- }
-
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
@@ -340,6 +333,12 @@ private[hudi] object HoodieSparkSqlWriter {
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
+ } else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
+ // When user set operation as INSERT_OVERWRITE_TABLE,
+ // overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation
+ log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
+ fs.delete(tablePath, true)
+ tableExists = false
}
} else {
// Delete Operation only supports Append mode
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 730b7d2..b15a7d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -202,7 +202,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -229,6 +229,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 121957e..1ea6ceb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -278,7 +278,6 @@ class TestMORDataSource extends HoodieClientTestBase {
val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2))
inputDF5.write.format("org.apache.hudi")
.options(commonOpts)
- .option("hoodie.compact.inline", "true")
.mode(SaveMode.Append)
.save(basePath)
val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)