You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/11 17:07:58 UTC

[hudi] branch master updated: [HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new de42adc  [HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428)
de42adc is described below

commit de42adc2302528541145a714078cc6d10cbb8d9a
Author: lw0090 <lw...@gmail.com>
AuthorDate: Tue Jan 12 01:07:47 2021 +0800

    [HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428)
---
 .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala   | 13 ++++++-------
 .../org/apache/hudi/functional/TestCOWDataSource.scala      |  3 ++-
 .../org/apache/hudi/functional/TestMORDataSource.scala      |  1 -
 3 files changed, 8 insertions(+), 9 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 472f450..4e9caa5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -94,13 +94,6 @@ private[hudi] object HoodieSparkSqlWriter {
       operation = WriteOperationType.INSERT
     }
 
-    // If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
-    // Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite
-    // the table. This will replace the old fs.delete(tablepath) mode.
-    if (mode == SaveMode.Overwrite && operation !=  WriteOperationType.INSERT_OVERWRITE_TABLE) {
-      operation = WriteOperationType.INSERT_OVERWRITE_TABLE
-    }
-
     val jsc = new JavaSparkContext(sparkContext)
     val basePath = new Path(path.get)
     val instantTime = HoodieActiveTimeline.createNewInstantTime()
@@ -340,6 +333,12 @@ private[hudi] object HoodieSparkSqlWriter {
     if (operation != WriteOperationType.DELETE) {
       if (mode == SaveMode.ErrorIfExists && tableExists) {
         throw new HoodieException(s"hoodie table at $tablePath already exists.")
+      } else if (mode == SaveMode.Overwrite && tableExists && operation !=  WriteOperationType.INSERT_OVERWRITE_TABLE) {
+        // When user set operation as INSERT_OVERWRITE_TABLE,
+        // overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation
+        log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
+        fs.delete(tablePath, true)
+        tableExists = false
       }
     } else {
       // Delete Operation only supports Append mode
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 730b7d2..b15a7d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -202,7 +202,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
     inputDF2.write.format("org.apache.hudi")
       .options(commonOpts)
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
@@ -229,6 +229,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
     inputDF2.write.format("org.apache.hudi")
       .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 121957e..1ea6ceb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -278,7 +278,6 @@ class TestMORDataSource extends HoodieClientTestBase {
     val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2))
     inputDF5.write.format("org.apache.hudi")
       .options(commonOpts)
-      .option("hoodie.compact.inline", "true")
       .mode(SaveMode.Append)
       .save(basePath)
     val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)