You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/03/03 15:09:14 UTC
[hudi] branch master updated: [HUDI-5665] Adding support to re-use table configs (#7901)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cfe490fcb23 [HUDI-5665] Adding support to re-use table configs (#7901)
cfe490fcb23 is described below
commit cfe490fcb2333049b4f47a2d1d241b07e12d42c1
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Mar 3 07:09:03 2023 -0800
[HUDI-5665] Adding support to re-use table configs (#7901)
- As of now, we expect users to set some of the mandatory fields in every write. For eg, record keys, partition path etc. These cannot change for a given table and gets serialized into table config. In this patch, we are adding support to re-use table configs. So, users can set these configs only in the first commit for a given table. Subsequent writes might re-use from table config if not explicitly set by the user.
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 29 +++-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 44 ++++--
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 22 ++-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 34 ++---
.../apache/hudi/functional/TestCOWDataSource.scala | 164 +++++++++++++++++++++
.../hudi/functional/TestStreamingSource.scala | 4 +
6 files changed, 260 insertions(+), 37 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index d2c8629df98..1e3c219b6c6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{Option, StringUtils}
-import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodiePayloadConfig, HoodieWriteConfig}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
@@ -830,6 +830,33 @@ object DataSourceOptionsHelper {
translatedOpt.toMap
}
+ /**
+ * Some config keys differ from what user sets and whats part of table Config. this method assists in fetching the
+ * right table config and populating write configs.
+ * @param tableConfig table config of interest.
+ * @param params incoming write params.
+ * @return missing params that needs to be added to incoming write params
+ */
+ def fetchMissingWriteConfigsFromTableConfig(tableConfig: HoodieTableConfig, params: Map[String, String]) : Map[String, String] = {
+ val missingWriteConfigs = scala.collection.mutable.Map[String, String]()
+ if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) && tableConfig.getRecordKeyFieldProp != null) {
+ missingWriteConfigs ++= Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> tableConfig.getRecordKeyFieldProp)
+ }
+ if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) && tableConfig.getPartitionFieldProp != null) {
+ missingWriteConfigs ++= Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> tableConfig.getPartitionFieldProp)
+ }
+ if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key()) && tableConfig.getKeyGeneratorClassName != null) {
+ missingWriteConfigs ++= Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> tableConfig.getKeyGeneratorClassName)
+ }
+ if (!params.contains(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key()) && tableConfig.getPreCombineField != null) {
+ missingWriteConfigs ++= Map(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> tableConfig.getPreCombineField)
+ }
+ if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && tableConfig.getPayloadClass != null) {
+ missingWriteConfigs ++= Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> tableConfig.getPayloadClass)
+ }
+ missingWriteConfigs.toMap
+ }
+
def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE.
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 96346652e74..ceb33875f5b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -22,6 +22,7 @@ import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
+import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
@@ -121,16 +122,19 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
- var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
- val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
- val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
+ var tableConfig = getHoodieTableConfig(sparkContext, path, mode, hoodieTableConfigOpt)
+ // get params w/o injecting default and validate
+ val paramsWithoutDefaults = HoodieWriterUtils.getParamsWithAlternatives(optParams)
+ val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
- originKeyGeneratorClassName, parameters)
+ originKeyGeneratorClassName, paramsWithoutDefaults)
// Validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite);
+ // re-use table configs and inject defaults.
+ val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
@@ -202,7 +206,7 @@ object HoodieSparkSqlWriter {
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
- .setKeyGeneratorClassProp(originKeyGeneratorClassName)
+ .setKeyGeneratorClassProp(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key))
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
@@ -674,7 +678,8 @@ object HoodieSparkSqlWriter {
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
- val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
+ // fetch table config for an already existing table and SaveMode is not Overwrite.
+ val tableConfig = getHoodieTableConfig(sparkContext, path, mode, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
@@ -1022,10 +1027,19 @@ object HoodieSparkSqlWriter {
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled
}
+ /**
+ * Fetch table config for an already existing table and if save mode is not Overwrite.
+ * @param sparkContext instance of {@link SparkContext} to use.
+ * @param tablePath table base path.
+ * @param mode save mode in use.
+ * @param hoodieTableConfigOpt return table config from this Option if present. else poll from a new metaClient.
+ * @return {@link HoodieTableConfig} is conditions match. if not, returns null.
+ */
private def getHoodieTableConfig(sparkContext: SparkContext,
tablePath: String,
+ mode: SaveMode,
hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = {
- if (tableExists) {
+ if (tableExists && mode != SaveMode.Overwrite) {
hoodieTableConfigOpt.getOrElse(
HoodieTableMetaClient.builder().setConf(sparkContext.hadoopConfiguration).setBasePath(tablePath)
.build().getTableConfig)
@@ -1037,17 +1051,23 @@ object HoodieSparkSqlWriter {
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
- val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
+ var translatedOptsWithMappedTableConfig = mutable.Map.empty ++ translatedOptions.toMap
+ if (tableConfig != null && mode != SaveMode.Overwrite) {
+ // for missing write configs corresponding to table configs, fill them up.
+ fetchMissingWriteConfigsFromTableConfig(tableConfig, optParams).foreach((kv) => translatedOptsWithMappedTableConfig += (kv._1 -> kv._2))
+ }
+ val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
&& mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
- mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
+ mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)
}
if (null != tableConfig && mode != SaveMode.Overwrite) {
- tableConfig.getProps.foreach { case (key, value) =>
- mergedParams(key) = value
+ // over-ride only if not explicitly set by the user.
+ tableConfig.getProps.filter( kv => !mergedParams.contains(kv._1))
+ .foreach { case (key, value) =>
+ mergedParams(key) = value
}
}
-
// use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY
if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 7dda361f523..c9a66b9e3d4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -38,10 +38,6 @@ import scala.collection.JavaConverters._
*/
object HoodieWriterUtils {
- def javaParametersWithWriteDefaults(parameters: java.util.Map[String, String]): java.util.Map[String, String] = {
- mapAsJavaMap(parametersWithWriteDefaults(parameters.asScala.toMap))
- }
-
/**
* Add default options for unspecified write options keys.
*
@@ -89,6 +85,21 @@ object HoodieWriterUtils {
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}
+ /**
+ * Fetch params by translating alternatives if any. Do not set any default as this method is intended to be called
+ * before validation.
+ * @param parameters hash map of parameters.
+ * @return hash map of raw with translated parameters.
+ */
+ def getParamsWithAlternatives(parameters: Map[String, String]): Map[String, String] = {
+ val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
+ val props = new Properties()
+ props.putAll(parameters)
+ val hoodieConfig: HoodieConfig = new HoodieConfig(props)
+ // do not set any default as this is called before validation.
+ Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
+ }
+
/**
* Get the partition columns to stored to hoodie.properties.
* @param parameters
@@ -146,8 +157,7 @@ object HoodieWriterUtils {
val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
- if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
- && datasourcePreCombineKey != tableConfigPreCombineKey) {
+ if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey && datasourcePreCombineKey != tableConfigPreCombineKey) {
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 72dbb0c2539..dc9bff48f38 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -638,10 +638,10 @@ class TestHoodieSparkSqlWriter {
.setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
- .setPayloadClassName(fooTableParams(PAYLOAD_CLASS_NAME.key))
- .setPreCombineField(fooTableParams(PRECOMBINE_FIELD.key))
+ .setPayloadClassName(PAYLOAD_CLASS_NAME.key)
+ .setPreCombineField(fooTableParams.getOrElse(PRECOMBINE_FIELD.key, PRECOMBINE_FIELD.defaultValue()))
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
- .setKeyGeneratorClassProp(fooTableParams(KEYGENERATOR_CLASS_NAME.key))
+ .setKeyGeneratorClassProp(fooTableParams.getOrElse(KEYGENERATOR_CLASS_NAME.key, KEYGENERATOR_CLASS_NAME.defaultValue()))
if(addBootstrapPath) {
tableMetaClientBuilder
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
@@ -988,7 +988,8 @@ class TestHoodieSparkSqlWriter {
| ) using hudi
| partitioned by (dt)
| options (
- | primaryKey = 'id'
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
| )
| location '$tablePath1'
""".stripMargin)
@@ -1002,7 +1003,8 @@ class TestHoodieSparkSqlWriter {
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Append).save(tablePath1)
- assert(spark.read.format("hudi").load(tablePath1 + "/*").count() == 1)
+ val hudiDf = spark.read.format("hudi").load(tablePath1)
+ assert(hudiDf.count() == 1)
// case 2: test table which created by dataframe
val (tableName2, tablePath2) = ("hoodie_test_params_2", s"$tempBasePath" + "_2")
@@ -1037,20 +1039,19 @@ class TestHoodieSparkSqlWriter {
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName2)
.mode(SaveMode.Append).save(tablePath2)
- val data = spark.read.format("hudi").load(tablePath2 + "/*")
+ val data = spark.read.format("hudi").load(tablePath2)
assert(data.count() == 2)
assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16")
}
@Test
- def testNonpartitonedToDefaultKeyGen(): Unit = {
+ def testNonpartitonedWithResuseTableConfig(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
- DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
- DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts"
)
// case 1: When commit C1 specificies a key generator and commit C2 does not specify key generator
@@ -1064,15 +1065,12 @@ class TestHoodieSparkSqlWriter {
.mode(SaveMode.Overwrite).save(tablePath1)
val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
- // raise exception when no KEYGENERATOR_CLASS_NAME is specified and it is expected to default to SimpleKeyGenerator
- val configConflictException = intercept[HoodieException] {
- df2.write.format("hudi")
- .options(options)
- .option(HoodieWriteConfig.TBL_NAME.key, tableName1)
- .mode(SaveMode.Append).save(tablePath1)
- }
- assert(configConflictException.getMessage.contains("Config conflict"))
- assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[SimpleKeyGenerator].getName}\t${classOf[NonpartitionedKeyGenerator].getName}"))
+ // In first commit, we explicitly over-ride it to Nonpartitioned, where as in 2nd batch, since re-using of table configs
+ // come into play, no exception should be thrown even if we don't supply any key gen class.
+ df2.write.format("hudi")
+ .options(options)
+ .option(HoodieWriteConfig.TBL_NAME.key, tableName1)
+ .mode(SaveMode.Append).save(tablePath1)
}
@Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index a0fc42673b9..6d8cf6c7fd0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -142,6 +142,170 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
}
+ @Test
+ def testReuseTableConfigs() {
+ val recordType = HoodieRecordType.AVRO
+ val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1",
+ HoodieMetadataConfig.ENABLE.key -> "false" // this is testing table configs and write configs. disabling metadata to save on test run time.
+ ))
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val commonOptsNoPreCombine = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.ENABLE.key -> "false"
+ ) ++ writeOpts
+
+ writeToHudi(commonOptsNoPreCombine, inputDF)
+ spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+ val optsWithNoRepeatedTableConfig = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieMetadataConfig.ENABLE.key -> "false"
+ ) ++ writeOpts
+ // this write should succeed even w/o setting any param for record key, partition path since table config will be re-used.
+ writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
+ spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+ }
+
+ @Test
+ def testSimpleKeyGenDroppingConfigs() {
+ val recordType = HoodieRecordType.AVRO
+ val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1",
+ HoodieMetadataConfig.ENABLE.key -> "false", // this is testing table configs and write configs. disabling metadata to save on test run time.
+ "hoodie.datasource.write.keygenerator.class" -> classOf[SimpleKeyGenerator].getCanonicalName
+ ))
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val commonOptsNoPreCombine = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.ENABLE.key -> "false"
+ ) ++ writeOpts
+
+ writeToHudi(commonOptsNoPreCombine, inputDF)
+ spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+ val optsWithNoRepeatedTableConfig = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieMetadataConfig.ENABLE.key -> "false"
+ )
+ // this write should succeed even w/o though we don't set key gen explicitly.
+ writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
+ spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+ }
+
+ @Test
+ def testSimpleKeyGenExtraneuousAddition() {
+ val recordType = HoodieRecordType.AVRO
+ val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1"
+ ))
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val commonOptsNoPreCombine = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.ENABLE.key -> "false" // this is testing table configs and write configs. disabling metadata to save on test run time.
+ ) ++ writeOpts
+
+ writeToHudi(commonOptsNoPreCombine, inputDF)
+ spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+ val optsWithNoRepeatedTableConfig = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.datasource.write.keygenerator.class" -> classOf[SimpleKeyGenerator].getCanonicalName,
+ HoodieMetadataConfig.ENABLE.key -> "false"
+ )
+ // this write should succeed even w/o though we set key gen explicitly, its the default
+ writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
+ spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+ }
+
+ private def writeToHudi(opts: Map[String, String], df: Dataset[Row]) : Unit = {
+ df.write.format("hudi")
+ .options(opts)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+
+ @ParameterizedTest
+ @CsvSource(Array("hoodie.datasource.write.recordkey.field,begin_lat", "hoodie.datasource.write.partitionpath.field,end_lon",
+ "hoodie.datasource.write.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.write.precombine.field,fare"))
+ def testAlteringRecordKeyConfig(configKey: String, configValue: String) {
+ val recordType = HoodieRecordType.AVRO
+ val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1",
+ "hoodie.datasource.write.precombine.field" -> "ts",
+ HoodieMetadataConfig.ENABLE.key -> "false" // this is testing table configs and write configs. disabling metadata to save on test run time.
+ ))
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val commonOptsNoPreCombine = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.ENABLE.key -> "false"
+ ) ++ writeOpts
+ writeToHudi(commonOptsNoPreCombine, inputDF)
+
+ spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+
+ val optsForBatch2 = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieMetadataConfig.ENABLE.key -> "false",
+ configKey -> configValue
+ )
+
+ // this write should fail since we are setting a config explicitly which wasn't set in first commit and does not match the default value.
+ val t = assertThrows(classOf[Throwable]) {
+ writeToHudi(optsForBatch2, inputDF)
+ }
+ assertTrue(getRootCause(t).getMessage.contains("Config conflict"))
+ }
+
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testHoodieIsDeletedNonBooleanField(recordType: HoodieRecordType) {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index dede9fad45b..84f913cb1a8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -55,6 +55,7 @@ class TestStreamingSource extends StreamTest {
.setTableType(COPY_ON_WRITE)
.setTableName(getTableName(tablePath))
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
addData(tablePath, Seq(("1", "a1", "10", "000")))
@@ -105,6 +106,7 @@ class TestStreamingSource extends StreamTest {
.setTableType(MERGE_ON_READ)
.setTableName(getTableName(tablePath))
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
addData(tablePath, Seq(("1", "a1", "10", "000")))
@@ -149,6 +151,7 @@ class TestStreamingSource extends StreamTest {
.setTableType(COPY_ON_WRITE)
.setTableName(getTableName(tablePath))
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
addData(tablePath, Seq(("1", "a1", "10", "000")))
@@ -179,6 +182,7 @@ class TestStreamingSource extends StreamTest {
.setTableType(COPY_ON_WRITE)
.setTableName(getTableName(tablePath))
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
addData(tablePath, Seq(("1", "a1", "10", "000")))