You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/03/03 15:09:14 UTC

[hudi] branch master updated: [HUDI-5665] Adding support to re-use table configs (#7901)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cfe490fcb23 [HUDI-5665] Adding support to re-use table configs (#7901)
cfe490fcb23 is described below

commit cfe490fcb2333049b4f47a2d1d241b07e12d42c1
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Mar 3 07:09:03 2023 -0800

    [HUDI-5665] Adding support to re-use table configs (#7901)
    
    - As of now, we expect users to set some of the mandatory fields in every write. For eg, record keys, partition path etc. These cannot change for a given table and gets serialized into table config. In this patch, we are adding support to re-use table configs. So, users can set these configs only in the first commit for a given table. Subsequent writes might re-use from table config if not explicitly set by the user.
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  29 +++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  44 ++++--
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  22 ++-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  34 ++---
 .../apache/hudi/functional/TestCOWDataSource.scala | 164 +++++++++++++++++++++
 .../hudi/functional/TestStreamingSource.scala      |   4 +
 6 files changed, 260 insertions(+), 37 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index d2c8629df98..1e3c219b6c6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.{Option, StringUtils}
-import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodiePayloadConfig, HoodieWriteConfig}
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
@@ -830,6 +830,33 @@ object DataSourceOptionsHelper {
     translatedOpt.toMap
   }
 
+  /**
+   * Some config keys differ from what user sets and whats part of table Config. this method assists in fetching the
+   * right table config and populating write configs.
+   * @param tableConfig table config of interest.
+   * @param params incoming write params.
+   * @return missing params that needs to be added to incoming write params
+   */
+  def fetchMissingWriteConfigsFromTableConfig(tableConfig: HoodieTableConfig, params: Map[String, String]) : Map[String, String] = {
+    val missingWriteConfigs = scala.collection.mutable.Map[String, String]()
+    if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) && tableConfig.getRecordKeyFieldProp != null) {
+      missingWriteConfigs ++= Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> tableConfig.getRecordKeyFieldProp)
+    }
+    if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) && tableConfig.getPartitionFieldProp != null) {
+      missingWriteConfigs ++= Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> tableConfig.getPartitionFieldProp)
+    }
+    if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key()) && tableConfig.getKeyGeneratorClassName != null) {
+      missingWriteConfigs ++= Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> tableConfig.getKeyGeneratorClassName)
+    }
+    if (!params.contains(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key()) && tableConfig.getPreCombineField != null) {
+      missingWriteConfigs ++= Map(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> tableConfig.getPreCombineField)
+    }
+    if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && tableConfig.getPayloadClass != null) {
+      missingWriteConfigs ++= Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> tableConfig.getPayloadClass)
+    }
+    missingWriteConfigs.toMap
+  }
+
   def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
     // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
     // or else use query type from QUERY_TYPE.
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 96346652e74..ceb33875f5b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -22,6 +22,7 @@ import org.apache.avro.generic.{GenericData, GenericRecord}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
+import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
 import org.apache.hudi.HoodieWriterUtils._
@@ -121,16 +122,19 @@ object HoodieSparkSqlWriter {
 
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
-    var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
-    val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
+    var tableConfig = getHoodieTableConfig(sparkContext, path, mode, hoodieTableConfigOpt)
+    // get params w/o injecting default and validate
+    val paramsWithoutDefaults = HoodieWriterUtils.getParamsWithAlternatives(optParams)
+    val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
     val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
-      originKeyGeneratorClassName, parameters)
+      originKeyGeneratorClassName, paramsWithoutDefaults)
 
     // Validate datasource and tableconfig keygen are the same
     validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
     validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite);
 
+    // re-use table configs and inject defaults.
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
     val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
     val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
       s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
@@ -202,7 +206,7 @@ object HoodieSparkSqlWriter {
           .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
           .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
           .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
-          .setKeyGeneratorClassProp(originKeyGeneratorClassName)
+          .setKeyGeneratorClassProp(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key))
           .set(timestampKeyGeneratorConfigs)
           .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
@@ -674,7 +678,8 @@ object HoodieSparkSqlWriter {
     val sparkContext = sqlContext.sparkContext
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
-    val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
+    // fetch table config for an already existing table and SaveMode is not Overwrite.
+    val tableConfig = getHoodieTableConfig(sparkContext, path, mode, hoodieTableConfigOpt)
     validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
 
     val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
@@ -1022,10 +1027,19 @@ object HoodieSparkSqlWriter {
     asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled
   }
 
+  /**
+   * Fetch table config for an already existing table and if save mode is not Overwrite.
+   * @param sparkContext instance of {@link SparkContext} to use.
+   * @param tablePath table base path.
+   * @param mode save mode in use.
+   * @param hoodieTableConfigOpt return table config from this Option if present. else poll from a new metaClient.
+   * @return {@link HoodieTableConfig} is conditions match. if not, returns null.
+   */
   private def getHoodieTableConfig(sparkContext: SparkContext,
                                    tablePath: String,
+                                   mode: SaveMode,
                                    hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = {
-    if (tableExists) {
+    if (tableExists && mode != SaveMode.Overwrite) {
       hoodieTableConfigOpt.getOrElse(
         HoodieTableMetaClient.builder().setConf(sparkContext.hadoopConfiguration).setBasePath(tablePath)
           .build().getTableConfig)
@@ -1037,17 +1051,23 @@ object HoodieSparkSqlWriter {
   private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
                                             tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
     val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
-    val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
+    var translatedOptsWithMappedTableConfig = mutable.Map.empty ++ translatedOptions.toMap
+    if (tableConfig != null && mode != SaveMode.Overwrite) {
+      // for missing write configs corresponding to table configs, fill them up.
+      fetchMissingWriteConfigsFromTableConfig(tableConfig, optParams).foreach((kv) => translatedOptsWithMappedTableConfig += (kv._1 -> kv._2))
+    }
+    val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap)
     if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
       && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
-      mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
+      mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)
     }
     if (null != tableConfig && mode != SaveMode.Overwrite) {
-      tableConfig.getProps.foreach { case (key, value) =>
-        mergedParams(key) = value
+      // over-ride only if not explicitly set by the user.
+      tableConfig.getProps.filter( kv => !mergedParams.contains(kv._1))
+        .foreach { case (key, value) =>
+          mergedParams(key) = value
       }
     }
-
     // use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY
     if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
       mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 7dda361f523..c9a66b9e3d4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -38,10 +38,6 @@ import scala.collection.JavaConverters._
  */
 object HoodieWriterUtils {
 
-  def javaParametersWithWriteDefaults(parameters: java.util.Map[String, String]): java.util.Map[String, String] = {
-    mapAsJavaMap(parametersWithWriteDefaults(parameters.asScala.toMap))
-  }
-
   /**
     * Add default options for unspecified write options keys.
     *
@@ -89,6 +85,21 @@ object HoodieWriterUtils {
     Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
   }
 
+  /**
+   * Fetch params by translating alternatives if any. Do not set any default as this method is intended to be called
+   * before validation.
+   * @param parameters hash map of parameters.
+   * @return hash map of raw with translated parameters.
+   */
+  def getParamsWithAlternatives(parameters: Map[String, String]): Map[String, String] = {
+    val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
+    val props = new Properties()
+    props.putAll(parameters)
+    val hoodieConfig: HoodieConfig = new HoodieConfig(props)
+    // do not set any default as this is called before validation.
+    Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
+  }
+
   /**
    * Get the partition columns to stored to hoodie.properties.
    * @param parameters
@@ -146,8 +157,7 @@ object HoodieWriterUtils {
 
         val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
         val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
-        if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
-          && datasourcePreCombineKey != tableConfigPreCombineKey) {
+        if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey && datasourcePreCombineKey != tableConfigPreCombineKey) {
           diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
         }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 72dbb0c2539..dc9bff48f38 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -638,10 +638,10 @@ class TestHoodieSparkSqlWriter {
       .setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
         HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
       .setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
-      .setPayloadClassName(fooTableParams(PAYLOAD_CLASS_NAME.key))
-      .setPreCombineField(fooTableParams(PRECOMBINE_FIELD.key))
+      .setPayloadClassName(PAYLOAD_CLASS_NAME.key)
+      .setPreCombineField(fooTableParams.getOrElse(PRECOMBINE_FIELD.key, PRECOMBINE_FIELD.defaultValue()))
       .setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
-      .setKeyGeneratorClassProp(fooTableParams(KEYGENERATOR_CLASS_NAME.key))
+      .setKeyGeneratorClassProp(fooTableParams.getOrElse(KEYGENERATOR_CLASS_NAME.key, KEYGENERATOR_CLASS_NAME.defaultValue()))
       if(addBootstrapPath) {
         tableMetaClientBuilder
           .setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
@@ -988,7 +988,8 @@ class TestHoodieSparkSqlWriter {
          | ) using hudi
          | partitioned by (dt)
          | options (
-         |  primaryKey = 'id'
+         |  primaryKey = 'id',
+         |  preCombineField = 'ts'
          | )
          | location '$tablePath1'
        """.stripMargin)
@@ -1002,7 +1003,8 @@ class TestHoodieSparkSqlWriter {
       .options(options)
       .option(HoodieWriteConfig.TBL_NAME.key, tableName1)
       .mode(SaveMode.Append).save(tablePath1)
-    assert(spark.read.format("hudi").load(tablePath1 + "/*").count() == 1)
+    val hudiDf = spark.read.format("hudi").load(tablePath1)
+    assert(hudiDf.count() == 1)
 
     // case 2: test table which created by dataframe
     val (tableName2, tablePath2) = ("hoodie_test_params_2", s"$tempBasePath" + "_2")
@@ -1037,20 +1039,19 @@ class TestHoodieSparkSqlWriter {
       .options(options)
       .option(HoodieWriteConfig.TBL_NAME.key, tableName2)
       .mode(SaveMode.Append).save(tablePath2)
-    val data = spark.read.format("hudi").load(tablePath2 + "/*")
+    val data = spark.read.format("hudi").load(tablePath2)
     assert(data.count() == 2)
     assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16")
   }
 
   @Test
-  def testNonpartitonedToDefaultKeyGen(): Unit = {
+  def testNonpartitonedWithResuseTableConfig(): Unit = {
     val _spark = spark
     import _spark.implicits._
     val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
     val options = Map(
       DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
-      DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
-      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
+      DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts"
     )
 
     // case 1: When commit C1 specificies a key generator and commit C2 does not specify key generator
@@ -1064,15 +1065,12 @@ class TestHoodieSparkSqlWriter {
       .mode(SaveMode.Overwrite).save(tablePath1)
 
     val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
-    // raise exception when no KEYGENERATOR_CLASS_NAME is specified and it is expected to default to SimpleKeyGenerator
-    val configConflictException = intercept[HoodieException] {
-      df2.write.format("hudi")
-        .options(options)
-        .option(HoodieWriteConfig.TBL_NAME.key, tableName1)
-        .mode(SaveMode.Append).save(tablePath1)
-    }
-    assert(configConflictException.getMessage.contains("Config conflict"))
-    assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[SimpleKeyGenerator].getName}\t${classOf[NonpartitionedKeyGenerator].getName}"))
+    // In first commit, we explicitly over-ride it to Nonpartitioned, where as in 2nd batch, since re-using of table configs
+    // come into play, no exception should be thrown even if we don't supply any key gen class.
+    df2.write.format("hudi")
+      .options(options)
+      .option(HoodieWriteConfig.TBL_NAME.key, tableName1)
+      .mode(SaveMode.Append).save(tablePath1)
   }
 
   @Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index a0fc42673b9..6d8cf6c7fd0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -142,6 +142,170 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
     spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
   }
 
+  @Test
+  def testReuseTableConfigs() {
+    val recordType = HoodieRecordType.AVRO
+    val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+      "hoodie.delete.shuffle.parallelism" -> "1",
+      HoodieMetadataConfig.ENABLE.key -> "false" // this is testing table configs and write configs. disabling metadata to save on test run time.
+    ))
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val commonOptsNoPreCombine = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      HoodieMetadataConfig.ENABLE.key -> "false"
+    ) ++ writeOpts
+
+    writeToHudi(commonOptsNoPreCombine, inputDF)
+    spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+    val optsWithNoRepeatedTableConfig = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieMetadataConfig.ENABLE.key -> "false"
+    ) ++ writeOpts
+    // this write should succeed even w/o setting any param for record key, partition path since table config will be re-used.
+    writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
+    spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+  }
+
+  @Test
+  def testSimpleKeyGenDroppingConfigs() {
+    val recordType = HoodieRecordType.AVRO
+    val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+      "hoodie.delete.shuffle.parallelism" -> "1",
+      HoodieMetadataConfig.ENABLE.key -> "false", // this is testing table configs and write configs. disabling metadata to save on test run time.
+      "hoodie.datasource.write.keygenerator.class" -> classOf[SimpleKeyGenerator].getCanonicalName
+    ))
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val commonOptsNoPreCombine = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      HoodieMetadataConfig.ENABLE.key -> "false"
+    ) ++ writeOpts
+
+    writeToHudi(commonOptsNoPreCombine, inputDF)
+    spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+    val optsWithNoRepeatedTableConfig = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieMetadataConfig.ENABLE.key -> "false"
+    )
+    // this write should succeed even w/o though we don't set key gen explicitly.
+    writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
+    spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+  }
+
+  @Test
+  def testSimpleKeyGenExtraneuousAddition() {
+    val recordType = HoodieRecordType.AVRO
+    val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+      "hoodie.delete.shuffle.parallelism" -> "1"
+    ))
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val commonOptsNoPreCombine = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      HoodieMetadataConfig.ENABLE.key -> "false" // this is testing table configs and write configs. disabling metadata to save on test run time.
+    ) ++ writeOpts
+
+    writeToHudi(commonOptsNoPreCombine, inputDF)
+    spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+    val optsWithNoRepeatedTableConfig = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.datasource.write.keygenerator.class" -> classOf[SimpleKeyGenerator].getCanonicalName,
+      HoodieMetadataConfig.ENABLE.key -> "false"
+    )
+    // this write should succeed even w/o though we set key gen explicitly, its the default
+    writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
+    spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+  }
+
+  private def writeToHudi(opts: Map[String, String], df: Dataset[Row]) : Unit = {
+    df.write.format("hudi")
+      .options(opts)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array("hoodie.datasource.write.recordkey.field,begin_lat", "hoodie.datasource.write.partitionpath.field,end_lon",
+    "hoodie.datasource.write.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.write.precombine.field,fare"))
+  def testAlteringRecordKeyConfig(configKey: String, configValue: String) {
+    val recordType = HoodieRecordType.AVRO
+    val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+      "hoodie.delete.shuffle.parallelism" -> "1",
+      "hoodie.datasource.write.precombine.field" -> "ts",
+      HoodieMetadataConfig.ENABLE.key -> "false" // this is testing table configs and write configs. disabling metadata to save on test run time.
+    ))
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val commonOptsNoPreCombine = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      HoodieMetadataConfig.ENABLE.key -> "false"
+    ) ++ writeOpts
+    writeToHudi(commonOptsNoPreCombine, inputDF)
+
+    spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+    val optsForBatch2 = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieMetadataConfig.ENABLE.key -> "false",
+      configKey -> configValue
+    )
+
+    // this write should fail since we are setting a config explicitly which wasn't set in first commit and does not match the default value.
+    val t = assertThrows(classOf[Throwable]) {
+      writeToHudi(optsForBatch2, inputDF)
+    }
+    assertTrue(getRootCause(t).getMessage.contains("Config conflict"))
+  }
+
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
   def testHoodieIsDeletedNonBooleanField(recordType: HoodieRecordType) {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index dede9fad45b..84f913cb1a8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -55,6 +55,7 @@ class TestStreamingSource extends StreamTest {
           .setTableType(COPY_ON_WRITE)
           .setTableName(getTableName(tablePath))
           .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .setPreCombineField("ts")
           .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))
@@ -105,6 +106,7 @@ class TestStreamingSource extends StreamTest {
         .setTableType(MERGE_ON_READ)
         .setTableName(getTableName(tablePath))
         .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .setPreCombineField("ts")
         .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))
@@ -149,6 +151,7 @@ class TestStreamingSource extends StreamTest {
         .setTableType(COPY_ON_WRITE)
         .setTableName(getTableName(tablePath))
         .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .setPreCombineField("ts")
         .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))
@@ -179,6 +182,7 @@ class TestStreamingSource extends StreamTest {
         .setTableType(COPY_ON_WRITE)
         .setTableName(getTableName(tablePath))
         .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .setPreCombineField("ts")
         .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))