You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/02/02 12:20:29 UTC

[hudi] branch master updated: [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1459eddb0e7 [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)
1459eddb0e7 is described below

commit 1459eddb0e714459cce3fb23719154b119b582fd
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Feb 2 04:20:17 2023 -0800

    [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)
    
    * Remove `COMBINE_BEFORE_INSERT` config being overridden for insert operations
    
    * Revisited Spark SQL feature configuration to allow dichotomy of having:
      - (Feature-)specific "default" configuration (that could be overridden by the user)
      - "Overriding" configuration (that could NOT be overridden by the user)
    
    * Restoring existing behavior for Insert Into to deduplicate by default (if pre-combine is specified)
    
    * Fixing compilation
    
    * Fixing compilation (one more time)
    
    * Fixing options combination ordering
---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 236 +++++++++++----------
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  67 +++---
 2 files changed, 163 insertions(+), 140 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 77ff939cf26..c8f01a12623 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsingHiveCatalog}
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, withCombinedOptions}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
 import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -60,29 +60,33 @@ trait ProvidesHoodieConfig extends Logging {
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map.apply(
-        "path" -> hoodieCatalogTable.tableLocation,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        PRECOMBINE_FIELD.key -> preCombineField,
-        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
-        OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
-        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
-      )
-    }
+    val defaultOpts = Map[String, String](
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      OPERATION.key -> UPSERT_OPERATION_OPT_VAL
+    )
+
+    val overridingOpts = Map[String, String](
+      "path" -> hoodieCatalogTable.tableLocation,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      PRECOMBINE_FIELD.key -> preCombineField,
+      HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+      SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
+      PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = defaultOpts, overridingOpts = overridingOpts)
   }
 
   /**
@@ -107,7 +111,8 @@ trait ProvidesHoodieConfig extends Logging {
     val tableType = hoodieCatalogTable.tableTypeName
     val tableConfig = hoodieCatalogTable.tableConfig
 
-    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions)
+    val combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = extraOptions)
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig, extraOptions)
 
     val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")
@@ -174,32 +179,38 @@ trait ProvidesHoodieConfig extends Logging {
       classOf[OverwriteWithLatestAvroPayload].getCanonicalName
     }
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> path,
-        TABLE_TYPE.key -> tableType,
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        OPERATION.key -> operation,
-        HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        PRECOMBINE_FIELD.key -> preCombineField,
-        PARTITIONPATH_FIELD.key -> partitionFieldsStr,
-        PAYLOAD_CLASS_NAME.key -> payloadClassName,
-        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
-      )
-    }
+    val defaultOpts = Map(
+      PAYLOAD_CLASS_NAME.key -> payloadClassName,
+      // NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
+      //       for the table
+      HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn)
+    )
+
+    val overridingOpts = Map(
+      "path" -> path,
+      TABLE_TYPE.key -> tableType,
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      OPERATION.key -> operation,
+      HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
+      SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      PRECOMBINE_FIELD.key -> preCombineField,
+      PARTITIONPATH_FIELD.key -> partitionFieldsStr,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = defaultOpts, overridingOpts = overridingOpts)
   }
 
   def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
@@ -210,26 +221,27 @@ trait ProvidesHoodieConfig extends Logging {
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> hoodieCatalogTable.tableLocation,
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
-        OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
-        PARTITIONS_TO_DELETE.key -> partitionsToDrop,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
-        PARTITIONPATH_FIELD.key -> partitionFields,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
-      )
-    }
+    val overridingOpts = Map(
+      "path" -> hoodieCatalogTable.tableLocation,
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+      OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+      PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+      PARTITIONPATH_FIELD.key -> partitionFields,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = overridingOpts)
   }
 
   def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
@@ -245,35 +257,37 @@ trait ProvidesHoodieConfig extends Logging {
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> path,
-        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
-        TBL_NAME.key -> tableConfig.getTableName,
-        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
-        OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
-        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","),
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
-      )
-    }
+    val overridingOpts = Map(
+      "path" -> path,
+      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      TBL_NAME.key -> tableConfig.getTableName,
+      HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+      SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+      OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+      PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","),
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = overridingOpts)
   }
 
   def buildHiveSyncConfig(sparkSession: SparkSession,
                           hoodieCatalogTable: HoodieCatalogTable,
                           tableConfig: HoodieTableConfig,
                           extraOptions: Map[String, String] = Map.empty): HiveSyncConfig = {
-    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions)
+    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = extraOptions)
     val props = new TypedProperties(toProperties(combinedOpts))
 
     // Enable the hive sync by default if spark have enable the hive metastore.
@@ -304,19 +318,22 @@ trait ProvidesHoodieConfig extends Logging {
 
 object ProvidesHoodieConfig {
 
-  def filterNullValues(opts: Map[String, String]): Map[String, String] =
-    opts.filter { case (_, v) => v != null }
-
-  def withCombinedOptions(catalogTable: HoodieCatalogTable,
-                          tableConfig: HoodieTableConfig,
-                          sqlConf: SQLConf)(optionOverrides: Map[String, String] = Map.empty): Map[String, String] = {
-    combineOptions(catalogTable, tableConfig, sqlConf, optionOverrides)
-  }
-
-  private def combineOptions(catalogTable: HoodieCatalogTable,
+  // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+  //
+  // Spark SQL operations configuration might be coming from a variety of diverse sources
+  // that have to be ultimately combined under clear and consistent process:
+  //
+  //    - Default: specify default values preferred by the feature/component (could be
+  //      overridden by any source)
+  //
+  //    - Overriding: specify mandatory values required for the feature/component (could NOT be
+  //      overridden by any source)s
+  //
+  def combineOptions(catalogTable: HoodieCatalogTable,
                              tableConfig: HoodieTableConfig,
                              sqlConf: SQLConf,
-                             optionOverrides: Map[String, String] = Map.empty): Map[String, String] = {
+                             defaultOpts: Map[String, String],
+                             overridingOpts: Map[String, String] = Map.empty): Map[String, String] = {
     // NOTE: Properties are merged in the following order of priority (first has the highest priority, last has the
     //       lowest, which is inverse to the ordering in the code):
     //          1. (Extra) Option overrides
@@ -324,15 +341,20 @@ object ProvidesHoodieConfig {
     //          3. Persisted Hudi's Table configs
     //          4. Table's properties in Spark Catalog
     //          5. Global DFS properties
-    DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
+    //          6. (Feature-specific) Default values
+    filterNullValues(defaultOpts) ++
+      DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
       // NOTE: Catalog table provided t/h `TBLPROPERTIES` clause might contain Spark SQL specific
       //       properties that need to be mapped into Hudi's conventional ones
       mapSqlOptionsToDataSourceWriteConfigs(catalogTable.catalogProperties) ++
       tableConfig.getProps.asScala.toMap ++
       filterHoodieConfigs(sqlConf.getAllConfs) ++
-      filterNullValues(optionOverrides)
+      filterNullValues(overridingOpts)
   }
 
+  private def filterNullValues(opts: Map[String, String]): Map[String, String] =
+    opts.filter { case (_, v) => v != null }
+
   private def filterHoodieConfigs(opts: Map[String, String]): Map[String, String] =
     opts.filterKeys(isHoodieConfigKey)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 93972b392b2..ed3f2591253 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
 import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -504,38 +504,39 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
 
-    withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
-      Map(
-        "path" -> path,
-        RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
-        PRECOMBINE_FIELD.key -> preCombineField,
-        TBL_NAME.key -> hoodieCatalogTable.tableName,
-        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
-        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
-        HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
-        HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
-        PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
-
-        // NOTE: We have to explicitly override following configs to make sure no schema validation is performed
-        //       as schema of the incoming dataset might be diverging from the table's schema (full schemas'
-        //       compatibility b/w table's schema and incoming one is not necessary in this case since we can
-        //       be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the
-        //       target table, ie partially updating)
-        AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
-        RECONCILE_SCHEMA.key -> "false",
-        CANONICALIZE_NULLABLE.key -> "false"
-      )
-    }
+    val overridingOpts = Map(
+      "path" -> path,
+      RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
+      PRECOMBINE_FIELD.key -> preCombineField,
+      TBL_NAME.key -> hoodieCatalogTable.tableName,
+      PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+      HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+      URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
+      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
+      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
+      HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+      HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
+      SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+      PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
+
+      // NOTE: We have to explicitly override following configs to make sure no schema validation is performed
+      //       as schema of the incoming dataset might be diverging from the table's schema (full schemas'
+      //       compatibility b/w table's schema and incoming one is not necessary in this case since we can
+      //       be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the
+      //       target table, ie partially updating)
+      AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
+      RECONCILE_SCHEMA.key -> "false",
+      CANONICALIZE_NULLABLE.key -> "false"
+    )
+
+    combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+      defaultOpts = Map.empty, overridingOpts = overridingOpts)
   }
 }