You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/15 15:10:42 UTC
[hudi] branch master updated: [HUDI-3403] Ensure keygen props are set for bootstrap (#6645)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a3921a845f [HUDI-3403] Ensure keygen props are set for bootstrap (#6645)
a3921a845f is described below
commit a3921a845f608f2018ec3237f9d7421696b7b05d
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Thu Sep 15 20:40:37 2022 +0530
[HUDI-3403] Ensure keygen props are set for bootstrap (#6645)
---
.../hudi/common/table/HoodieTableMetaClient.java | 3 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 10 +++--
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 18 +++++----
.../functional/TestDataSourceForBootstrap.scala | 43 +++++++++++++---------
4 files changed, 43 insertions(+), 31 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index caab37162c..16dd373486 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -876,11 +876,10 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
- public PropertyBuilder set(String key, Object value) {
+ private void set(String key, Object value) {
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
this.others.put(key, value);
}
- return this;
}
public PropertyBuilder set(Map<String, Object> props) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 028de00f7f..99be676f14 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -31,8 +31,8 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
-import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
+import org.apache.hudi.common.util.{CommitUtils, StringUtils}
+import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
@@ -469,7 +469,10 @@ object HoodieSparkSqlWriter {
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
- val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+ val keyGenProp =
+ if (StringUtils.nonEmpty(hoodieConfig.getString(KEYGEN_CLASS_NAME))) hoodieConfig.getString(KEYGEN_CLASS_NAME)
+ else hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+ val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters)
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
@@ -493,6 +496,7 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
+ .set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 93469f2796..89f0acaf01 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -17,9 +17,6 @@
package org.apache.hudi
-import java.io.IOException
-import java.time.Instant
-import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
@@ -43,12 +40,15 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue,
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept}
+import java.io.IOException
+import java.time.Instant
+import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters
@@ -508,7 +508,7 @@ class TestHoodieSparkSqlWriter {
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
- initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
+ initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -565,7 +565,7 @@ class TestHoodieSparkSqlWriter {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
- initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)
+ initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true, initBasePath = false)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
@@ -593,7 +593,7 @@ class TestHoodieSparkSqlWriter {
}
}
- def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean) : Unit = {
+ def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean, initBasePath: Boolean) : Unit = {
// when metadata is enabled, directly instantiating write client using DataSourceUtils.createHoodieClient
// will hit a code which tries to instantiate meta client for data table. if table does not exist, it fails.
// hence doing an explicit instantiation here.
@@ -612,7 +612,9 @@ class TestHoodieSparkSqlWriter {
tableMetaClientBuilder
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
}
- tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
+ if (initBasePath) {
+ tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
+ }
}
/**
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index f0dd89df1c..23b21b315f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
-import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
@@ -31,9 +31,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
import java.time.Instant
import java.util.Collections
-
import scala.collection.JavaConverters._
class TestDataSourceForBootstrap {
@@ -102,9 +102,12 @@ class TestDataSourceForBootstrap {
.save(srcPath)
// Perform bootstrap
+ val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
+ val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
- extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+ extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
+ bootstrapKeygenClass = bootstrapKeygenClass
)
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -123,10 +126,10 @@ class TestDataSourceForBootstrap {
updateDF.write
.format("hudi")
- .options(commonOpts)
+ .options(options)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+ .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, bootstrapKeygenClass)
.mode(SaveMode.Append)
.save(basePath)
@@ -163,8 +166,8 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
- Some("datestr"),
- Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
+ commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
+ classOf[SimpleKeyGenerator].getName)
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -227,7 +230,9 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
- DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
+ DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+ commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+ classOf[SimpleKeyGenerator].getName)
// Read bootstrapped table and verify count using glob path
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -302,7 +307,9 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
- DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
+ DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+ classOf[SimpleKeyGenerator].getName)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -367,7 +374,9 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
- DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
+ DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+ classOf[SimpleKeyGenerator].getName)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -497,18 +506,16 @@ class TestDataSourceForBootstrap {
}
def runMetadataBootstrapAndVerifyCommit(tableType: String,
- partitionColumns: Option[String] = None,
- extraOpts: Map[String, String] = Map.empty): String = {
+ extraOpts: Map[String, String] = Map.empty,
+ bootstrapKeygenClass: String): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
- .options(commonOpts)
.options(extraOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
.option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
- .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
+ .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, bootstrapKeygenClass)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -528,7 +535,7 @@ class TestDataSourceForBootstrap {
.load(basePath)
assertEquals(numRecords, hoodieIncViewDF1.count())
- var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
+ var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
@@ -537,10 +544,10 @@ class TestDataSourceForBootstrap {
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, bootstrapCommitInstantTime)
- .load(basePath);
+ .load(basePath)
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
- countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
+ countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))