You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/02/02 12:20:29 UTC
[hudi] branch master updated: [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1459eddb0e7 [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)
1459eddb0e7 is described below
commit 1459eddb0e714459cce3fb23719154b119b582fd
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Feb 2 04:20:17 2023 -0800
[HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813)
* Remove `COMBINE_BEFORE_INSERT` config being overridden for insert operations
* Revisited Spark SQL feature configuration to allow dichotomy of having:
- (Feature-)specific "default" configuration (that could be overridden by the user)
- "Overriding" configuration (that could NOT be overridden by the user)
* Restoring existing behavior for Insert Into to deduplicate by default (if pre-combine is specified)
* Fixing compilation
* Fixing compilation (one more time)
* Fixing options combination ordering
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 236 +++++++++++----------
.../hudi/command/MergeIntoHoodieTableCommand.scala | 67 +++---
2 files changed, 163 insertions(+), 140 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 77ff939cf26..c8f01a12623 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsingHiveCatalog}
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, withCombinedOptions}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -60,29 +60,33 @@ trait ProvidesHoodieConfig extends Logging {
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
- withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
- Map.apply(
- "path" -> hoodieCatalogTable.tableLocation,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
- TBL_NAME.key -> hoodieCatalogTable.tableName,
- PRECOMBINE_FIELD.key -> preCombineField,
- HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
- URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
- OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
- PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
- HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
- HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
- SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
- )
- }
+ val defaultOpts = Map[String, String](
+ KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ OPERATION.key -> UPSERT_OPERATION_OPT_VAL
+ )
+
+ val overridingOpts = Map[String, String](
+ "path" -> hoodieCatalogTable.tableLocation,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ PRECOMBINE_FIELD.key -> preCombineField,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
+ PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
+ HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString
+ )
+
+ combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+ defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
/**
@@ -107,7 +111,8 @@ trait ProvidesHoodieConfig extends Logging {
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
- val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions)
+ val combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+ defaultOpts = Map.empty, overridingOpts = extraOptions)
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig, extraOptions)
val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")
@@ -174,32 +179,38 @@ trait ProvidesHoodieConfig extends Logging {
classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
- withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
- Map(
- "path" -> path,
- TABLE_TYPE.key -> tableType,
- TBL_NAME.key -> hoodieCatalogTable.tableName,
- OPERATION.key -> operation,
- HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
- URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
- PRECOMBINE_FIELD.key -> preCombineField,
- PARTITIONPATH_FIELD.key -> partitionFieldsStr,
- PAYLOAD_CLASS_NAME.key -> payloadClassName,
- HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
- HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
- HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
- SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
- )
- }
+ val defaultOpts = Map(
+ PAYLOAD_CLASS_NAME.key -> payloadClassName,
+ // NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
+ // for the table
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn)
+ )
+
+ val overridingOpts = Map(
+ "path" -> path,
+ TABLE_TYPE.key -> tableType,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ OPERATION.key -> operation,
+ HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+ KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
+ SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PRECOMBINE_FIELD.key -> preCombineField,
+ PARTITIONPATH_FIELD.key -> partitionFieldsStr,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+ HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+ )
+
+ combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+ defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
@@ -210,26 +221,27 @@ trait ProvidesHoodieConfig extends Logging {
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
- withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
- Map(
- "path" -> hoodieCatalogTable.tableLocation,
- TBL_NAME.key -> hoodieCatalogTable.tableName,
- TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
- OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
- PARTITIONS_TO_DELETE.key -> partitionsToDrop,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
- PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
- PARTITIONPATH_FIELD.key -> partitionFields,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
- HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
- HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
- )
- }
+ val overridingOpts = Map(
+ "path" -> hoodieCatalogTable.tableLocation,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+ OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+ PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+ HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+ )
+
+ combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+ defaultOpts = Map.empty, overridingOpts = overridingOpts)
}
def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
@@ -245,35 +257,37 @@ trait ProvidesHoodieConfig extends Logging {
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
- withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
- Map(
- "path" -> path,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
- TBL_NAME.key -> tableConfig.getTableName,
- HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
- URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
- OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
- PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
- HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
- HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","),
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
- SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
- )
- }
+ val overridingOpts = Map(
+ "path" -> path,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ TBL_NAME.key -> tableConfig.getTableName,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+ OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+ HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","),
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
+ )
+
+ combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+ defaultOpts = Map.empty, overridingOpts = overridingOpts)
}
def buildHiveSyncConfig(sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
tableConfig: HoodieTableConfig,
extraOptions: Map[String, String] = Map.empty): HiveSyncConfig = {
- val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions)
+ val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+ defaultOpts = Map.empty, overridingOpts = extraOptions)
val props = new TypedProperties(toProperties(combinedOpts))
// Enable the hive sync by default if spark have enable the hive metastore.
@@ -304,19 +318,22 @@ trait ProvidesHoodieConfig extends Logging {
object ProvidesHoodieConfig {
- def filterNullValues(opts: Map[String, String]): Map[String, String] =
- opts.filter { case (_, v) => v != null }
-
- def withCombinedOptions(catalogTable: HoodieCatalogTable,
- tableConfig: HoodieTableConfig,
- sqlConf: SQLConf)(optionOverrides: Map[String, String] = Map.empty): Map[String, String] = {
- combineOptions(catalogTable, tableConfig, sqlConf, optionOverrides)
- }
-
- private def combineOptions(catalogTable: HoodieCatalogTable,
+ // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ //
+ // Spark SQL operations configuration might be coming from a variety of diverse sources
+ // that have to be ultimately combined under clear and consistent process:
+ //
+ // - Default: specify default values preferred by the feature/component (could be
+ // overridden by any source)
+ //
+ // - Overriding: specify mandatory values required for the feature/component (could NOT be
+ // overridden by any source)s
+ //
+ def combineOptions(catalogTable: HoodieCatalogTable,
tableConfig: HoodieTableConfig,
sqlConf: SQLConf,
- optionOverrides: Map[String, String] = Map.empty): Map[String, String] = {
+ defaultOpts: Map[String, String],
+ overridingOpts: Map[String, String] = Map.empty): Map[String, String] = {
// NOTE: Properties are merged in the following order of priority (first has the highest priority, last has the
// lowest, which is inverse to the ordering in the code):
// 1. (Extra) Option overrides
@@ -324,15 +341,20 @@ object ProvidesHoodieConfig {
// 3. Persisted Hudi's Table configs
// 4. Table's properties in Spark Catalog
// 5. Global DFS properties
- DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
+ // 6. (Feature-specific) Default values
+ filterNullValues(defaultOpts) ++
+ DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
// NOTE: Catalog table provided t/h `TBLPROPERTIES` clause might contain Spark SQL specific
// properties that need to be mapped into Hudi's conventional ones
mapSqlOptionsToDataSourceWriteConfigs(catalogTable.catalogProperties) ++
tableConfig.getProps.asScala.toMap ++
filterHoodieConfigs(sqlConf.getAllConfs) ++
- filterNullValues(optionOverrides)
+ filterNullValues(overridingOpts)
}
+ private def filterNullValues(opts: Map[String, String]): Map[String, String] =
+ opts.filter { case (_, v) => v != null }
+
private def filterHoodieConfigs(opts: Map[String, String]): Map[String, String] =
opts.filterKeys(isHoodieConfigKey)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 93972b392b2..ed3f2591253 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -504,38 +504,39 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)
- withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
- Map(
- "path" -> path,
- RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
- PRECOMBINE_FIELD.key -> preCombineField,
- TBL_NAME.key -> hoodieCatalogTable.tableName,
- PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
- HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
- URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
- HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
- HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
- HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
- SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
- PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
-
- // NOTE: We have to explicitly override following configs to make sure no schema validation is performed
- // as schema of the incoming dataset might be diverging from the table's schema (full schemas'
- // compatibility b/w table's schema and incoming one is not necessary in this case since we can
- // be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the
- // target table, ie partially updating)
- AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
- RECONCILE_SCHEMA.key -> "false",
- CANONICALIZE_NULLABLE.key -> "false"
- )
- }
+ val overridingOpts = Map(
+ "path" -> path,
+ RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
+ PRECOMBINE_FIELD.key -> preCombineField,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
+ HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
+ SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+ PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
+
+ // NOTE: We have to explicitly override following configs to make sure no schema validation is performed
+ // as schema of the incoming dataset might be diverging from the table's schema (full schemas'
+ // compatibility b/w table's schema and incoming one is not necessary in this case since we can
+ // be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the
+ // target table, ie partially updating)
+ AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
+ RECONCILE_SCHEMA.key -> "false",
+ CANONICALIZE_NULLABLE.key -> "false"
+ )
+
+ combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
+ defaultOpts = Map.empty, overridingOpts = overridingOpts)
}
}