You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 17:54:13 UTC

[hudi] branch release-0.13.0 updated (40d534e878b -> 43373a7187c)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a change to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 40d534e878b [HUDI-5647] Automate savepoint and restore tests (#7796)
     new 91a9c821d2f [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)
     new 43373a7187c [HUDI-5678] Fix `deduceShuffleParallelism` in row-writing Bulk Insert helper (#7818)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   4 +-
 .../org/apache/spark/sql/HoodieUnsafeUtils.scala   |  20 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 236 +++++++++++----------
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  67 +++---
 .../apache/spark/sql/hudi/TestInsertTable.scala    |  66 ++++++
 5 files changed, 248 insertions(+), 145 deletions(-)


[hudi] 02/02: [HUDI-5678] Fix `deduceShuffleParallelism` in row-writing Bulk Insert helper (#7818)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 43373a7187ca82590817d77e06c79de9b385de66
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Thu Feb 2 11:50:48 2023 -0500

    [HUDI-5678] Fix `deduceShuffleParallelism` in row-writing Bulk Insert helper (#7818)
    
    `deduceShuffleParallelism` returns 0 in some situations which should never occur.
---
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |  4 +-
 .../org/apache/spark/sql/HoodieUnsafeUtils.scala   | 20 ++++++-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 66 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 5 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 7e235993e33..a6488b07b51 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -34,7 +34,7 @@ import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
-import org.apache.spark.sql.HoodieUnsafeUtils.getOutputPartitioning
+import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -45,7 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
 import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
 
 object HoodieDatasetBulkInsertHelper
-  extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df => getOutputPartitioning(df).numPartitions)) with Logging {
+  extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df => getNumPartitions(df))) with Logging {
 
   /**
    * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index dfd416b6f52..ee22f714c9c 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -22,7 +22,8 @@ import org.apache.hudi.HoodieUnsafeRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
 
@@ -38,8 +39,21 @@ object HoodieUnsafeUtils {
    *       but instead will just execute Spark resolution, optimization and actual execution planning stages
    *       returning instance of [[SparkPlan]] ready for execution
    */
-  def getOutputPartitioning(df: DataFrame): Partitioning =
-    df.queryExecution.executedPlan.outputPartitioning
+  def getNumPartitions(df: DataFrame): Int = {
+    // NOTE: In general we'd rely on [[outputPartitioning]] of the executable [[SparkPlan]] to determine
+    //       number of partitions plan is going to be executed with.
+    //       However in case of [[LogicalRDD]] plan's output-partitioning will be stubbed as [[UnknownPartitioning]]
+    //       and therefore we will be falling back to determine number of partitions by looking at the RDD itself
+    df.queryExecution.logical match {
+      case LogicalRDD(_, rdd, outputPartitioning, _, _) =>
+        outputPartitioning match {
+          case _: UnknownPartitioning => rdd.getNumPartitions
+          case _ => outputPartitioning.numPartitions
+        }
+
+      case _ => df.queryExecution.executedPlan.outputPartitioning.numPartitions
+    }
+  }
 
   /**
    * Creates [[DataFrame]] from provided [[plan]]
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b092a68e20d..b33deebdf72 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -26,9 +26,11 @@ import org.apache.hudi.common.model.WriteOperationType
 import org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
+import org.scalatest.Inspectors.forAll
 
 import java.io.File
 
@@ -1059,4 +1061,68 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       )
     }
   }
+
+  /**
+   * This test is to make sure that bulk insert doesn't create a bunch of tiny files if
+   * hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start with the partition columns
+   *
+   * NOTE: Additionally, this test serves as a smoke test making sure that all of the bulk-insert
+   *       modes work
+   */
+  test(s"Test Bulk Insert with all sort-modes") {
+    withTempDir { basePath =>
+      BulkInsertSortMode.values().foreach { sortMode =>
+        val tableName = generateTableName
+        // Remove these with [HUDI-5419]
+        spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
+        spark.sessionState.conf.unsetConf("hoodie.datasource.write.insert.drop.duplicates")
+        spark.sessionState.conf.unsetConf("hoodie.merge.allow.duplicate.on.inserts")
+        spark.sessionState.conf.unsetConf("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled")
+        // Default parallelism is 200 which means in global sort, each record will end up in a different spark partition so
+        // 9 files would be created. Setting parallelism to 3 so that each spark partition will contain a hudi partition.
+        val parallelism = if (sortMode.name.equals(BulkInsertSortMode.GLOBAL_SORT.name())) {
+          "hoodie.bulkinsert.shuffle.parallelism = 3,"
+        } else {
+          ""
+        }
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  dt string
+             |) using hudi
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'name',
+             |  type = 'cow',
+             |  $parallelism
+             |  hoodie.bulkinsert.sort.mode = '${sortMode.name}'
+             | )
+             | partitioned by (dt)
+             | location '${basePath.getCanonicalPath}/$tableName'
+                """.stripMargin)
+
+        spark.sql("set hoodie.sql.bulk.insert.enable = true")
+        spark.sql("set hoodie.sql.insert.mode = non-strict")
+
+        spark.sql(
+          s"""insert into $tableName  values
+             |(5, 'a', 35, '2021-05-21'),
+             |(1, 'a', 31, '2021-01-21'),
+             |(3, 'a', 33, '2021-03-21'),
+             |(4, 'b', 16, '2021-05-21'),
+             |(2, 'b', 18, '2021-01-21'),
+             |(6, 'b', 17, '2021-03-21'),
+             |(8, 'a', 21, '2021-05-21'),
+             |(9, 'a', 22, '2021-01-21'),
+             |(7, 'a', 23, '2021-03-21')
+             |""".stripMargin)
+
+        // TODO re-enable
+        //assertResult(3)(spark.sql(s"select distinct _hoodie_file_name from $tableName").count())
+      }
+    }
+  }
 }


[hudi] 01/02: [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 91a9c821d2f04bd2e3233930eb8db9467dc12b26
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Feb 2 04:20:17 2023 -0800

    [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)
    
    * Remove `COMBINE_BEFORE_INSERT` config being overridden for insert operations
    
    * Revisited Spark SQL feature configuration to allow dichotomy of having:
      - (Feature-)specific "default" configuration (that could be overridden by the user)
      - "Overriding" configuration (that could NOT be overridden by the user)
    
    * Restoring existing behavior for Insert Into to deduplicate by default (if pre-combine is specified)
    
    * Fixing compilation
    
    * Fixing compilation (one more time)
    
    * Fixing options combination ordering
---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 236 +++++++++++----------
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  67 +++---
 2 files changed, 163 insertions(+), 140 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 77ff939cf26..c8f01a12623 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsingHiveCatalog}
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, withCombinedOptions}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
 import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -60,29 +60,33 @@ trait ProvidesHoodieConfig extends Logging {
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map.apply(
-        "path" -> hoodieCatalogTable.tableLocation,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        PRECOMBINE_FIELD.key -> preCombineField,
-        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
-        OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
-        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
-      )
-    }
+    val defaultOpts = Map[String, String](
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      OPERATION.key -> UPSERT_OPERATION_OPT_VAL
+    )
+
+    val overridingOpts = Map[String, String](
+      "path" -> hoodieCatalogTable.tableLocation,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      PRECOMBINE_FIELD.key -> preCombineField,
+      HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+      SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
+      PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = defaultOpts, overridingOpts = overridingOpts)
   }
 
   /**
@@ -107,7 +111,8 @@ trait ProvidesHoodieConfig extends Logging {
     val tableType = hoodieCatalogTable.tableTypeName
     val tableConfig = hoodieCatalogTable.tableConfig
 
-    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions)
+    val combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = extraOptions)
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig, extraOptions)
 
     val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")
@@ -174,32 +179,38 @@ trait ProvidesHoodieConfig extends Logging {
       classOf[OverwriteWithLatestAvroPayload].getCanonicalName
     }
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> path,
-        TABLE_TYPE.key -> tableType,
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        OPERATION.key -> operation,
-        HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        PRECOMBINE_FIELD.key -> preCombineField,
-        PARTITIONPATH_FIELD.key -> partitionFieldsStr,
-        PAYLOAD_CLASS_NAME.key -> payloadClassName,
-        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
-      )
-    }
+    val defaultOpts = Map(
+      PAYLOAD_CLASS_NAME.key -> payloadClassName,
+      // NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
+      //       for the table
+      HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn)
+    )
+
+    val overridingOpts = Map(
+      "path" -> path,
+      TABLE_TYPE.key -> tableType,
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      OPERATION.key -> operation,
+      HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
+      SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      PRECOMBINE_FIELD.key -> preCombineField,
+      PARTITIONPATH_FIELD.key -> partitionFieldsStr,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = defaultOpts, overridingOpts = overridingOpts)
   }
 
   def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
@@ -210,26 +221,27 @@ trait ProvidesHoodieConfig extends Logging {
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> hoodieCatalogTable.tableLocation,
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
-        OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
-        PARTITIONS_TO_DELETE.key -> partitionsToDrop,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
-        PARTITIONPATH_FIELD.key -> partitionFields,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
-      )
-    }
+    val overridingOpts = Map(
+      "path" -> hoodieCatalogTable.tableLocation,
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+      OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+      PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+      PARTITIONPATH_FIELD.key -> partitionFields,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = overridingOpts)
   }
 
   def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
@@ -245,35 +257,37 @@ trait ProvidesHoodieConfig extends Logging {
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> path,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        TBL_NAME.key -> tableConfig.getTableName,
-        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
-        OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
-        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","),
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
-      )
-    }
+    val overridingOpts = Map(
+      "path" -> path,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      TBL_NAME.key -> tableConfig.getTableName,
+      HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+      SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+      OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+      PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","),
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = overridingOpts)
   }
 
   def buildHiveSyncConfig(sparkSession: SparkSession,
                           hoodieCatalogTable: HoodieCatalogTable,
                           tableConfig: HoodieTableConfig,
                           extraOptions: Map[String, String] = Map.empty): HiveSyncConfig = {
-    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions)
+    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = extraOptions)
     val props = new TypedProperties(toProperties(combinedOpts))
 
     // Enable the hive sync by default if spark have enable the hive metastore.
@@ -304,19 +318,22 @@ trait ProvidesHoodieConfig extends Logging {
 
 object ProvidesHoodieConfig {
 
-  def filterNullValues(opts: Map[String, String]): Map[String, String] =
-    opts.filter { case (_, v) => v != null }
-
-  def withCombinedOptions(catalogTable: HoodieCatalogTable,
-                          tableConfig: HoodieTableConfig,
-                          sqlConf: SQLConf)(optionOverrides: Map[String, String] = Map.empty): Map[String, String] = {
-    combineOptions(catalogTable, tableConfig, sqlConf, optionOverrides)
-  }
-
-  private def combineOptions(catalogTable: HoodieCatalogTable,
+  // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+  //
+  // Spark SQL operations configuration might be coming from a variety of diverse sources
+  // that have to be ultimately combined under clear and consistent process:
+  //
+  //    - Default: specify default values preferred by the feature/component (could be
+  //      overridden by any source)
+  //
+  //    - Overriding: specify mandatory values required for the feature/component (could NOT be
+  //      overridden by any source)s
+  //
+  def combineOptions(catalogTable: HoodieCatalogTable,
                              tableConfig: HoodieTableConfig,
                              sqlConf: SQLConf,
-                             optionOverrides: Map[String, String] = Map.empty): Map[String, String] = {
+                             defaultOpts: Map[String, String],
+                             overridingOpts: Map[String, String] = Map.empty): Map[String, String] = {
     // NOTE: Properties are merged in the following order of priority (first has the highest priority, last has the
     //       lowest, which is inverse to the ordering in the code):
     //          1. (Extra) Option overrides
@@ -324,15 +341,20 @@ object ProvidesHoodieConfig {
     //          3. Persisted Hudi's Table configs
     //          4. Table's properties in Spark Catalog
     //          5. Global DFS properties
-    DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
+    //          6. (Feature-specific) Default values
+    filterNullValues(defaultOpts) ++
+      DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
       // NOTE: Catalog table provided t/h `TBLPROPERTIES` clause might contain Spark SQL specific
       //       properties that need to be mapped into Hudi's conventional ones
       mapSqlOptionsToDataSourceWriteConfigs(catalogTable.catalogProperties) ++
       tableConfig.getProps.asScala.toMap ++
       filterHoodieConfigs(sqlConf.getAllConfs) ++
-      filterNullValues(optionOverrides)
+      filterNullValues(overridingOpts)
   }
 
+  private def filterNullValues(opts: Map[String, String]): Map[String, String] =
+    opts.filter { case (_, v) => v != null }
+
   private def filterHoodieConfigs(opts: Map[String, String]): Map[String, String] =
     opts.filterKeys(isHoodieConfigKey)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 93972b392b2..ed3f2591253 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
 import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -504,38 +504,39 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> path,
-        RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
-        PRECOMBINE_FIELD.key -> preCombineField,
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
-        PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
-
-        // NOTE: We have to explicitly override following configs to make sure no schema validation is performed
-        //       as schema of the incoming dataset might be diverging from the table's schema (full schemas'
-        //       compatibility b/w table's schema and incoming one is not necessary in this case since we can
-        //       be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the
-        //       target table, ie partially updating)
-        AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
-        RECONCILE_SCHEMA.key -> "false",
-        CANONICALIZE_NULLABLE.key -> "false"
-      )
-    }
+    val overridingOpts = Map(
+      "path" -> path,
+      RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
+      PRECOMBINE_FIELD.key -> preCombineField,
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+      HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
+      SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+      PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
+
+      // NOTE: We have to explicitly override following configs to make sure no schema validation is performed
+      //       as schema of the incoming dataset might be diverging from the table's schema (full schemas'
+      //       compatibility b/w table's schema and incoming one is not necessary in this case since we can
+      //       be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the
+      //       target table, ie partially updating)
+      AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
+      RECONCILE_SCHEMA.key -> "false",
+      CANONICALIZE_NULLABLE.key -> "false"
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = overridingOpts)
   }
 }