You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/15 15:10:42 UTC

[hudi] branch master updated: [HUDI-3403] Ensure keygen props are set for bootstrap (#6645)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a3921a845f [HUDI-3403] Ensure keygen props are set for bootstrap (#6645)
a3921a845f is described below

commit a3921a845f608f2018ec3237f9d7421696b7b05d
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Thu Sep 15 20:40:37 2022 +0530

    [HUDI-3403] Ensure keygen props are set for bootstrap (#6645)
---
 .../hudi/common/table/HoodieTableMetaClient.java   |  3 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 10 +++--
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 18 +++++----
 .../functional/TestDataSourceForBootstrap.scala    | 43 +++++++++++++---------
 4 files changed, 43 insertions(+), 31 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index caab37162c..16dd373486 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -876,11 +876,10 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
-    public PropertyBuilder set(String key, Object value) {
+    private void set(String key, Object value) {
       if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
         this.others.put(key, value);
       }
-      return this;
     }
 
     public PropertyBuilder set(Map<String, Object> props) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 028de00f7f..99be676f14 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -31,8 +31,8 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
-import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
+import org.apache.hudi.common.util.{CommitUtils, StringUtils}
+import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
@@ -469,7 +469,10 @@ object HoodieSparkSqlWriter {
         val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
         val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
         val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
-        val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+        val keyGenProp =
+          if (StringUtils.nonEmpty(hoodieConfig.getString(KEYGEN_CLASS_NAME))) hoodieConfig.getString(KEYGEN_CLASS_NAME)
+          else hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+        val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters)
         val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
           HoodieTableConfig.POPULATE_META_FIELDS.key(),
           String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
@@ -493,6 +496,7 @@ object HoodieSparkSqlWriter {
           .setPartitionFields(partitionColumns)
           .setPopulateMetaFields(populateMetaFields)
           .setKeyGeneratorClassProp(keyGenProp)
+          .set(timestampKeyGeneratorConfigs)
           .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
           .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 93469f2796..89f0acaf01 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -17,9 +17,6 @@
 
 package org.apache.hudi
 
-import java.io.IOException
-import java.time.Instant
-import java.util.{Collections, Date, UUID}
 import org.apache.commons.io.FileUtils
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
@@ -43,12 +40,15 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue,
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{spy, times, verify}
 import org.scalatest.Assertions.assertThrows
 import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept}
 
+import java.io.IOException
+import java.time.Instant
+import java.util.{Collections, Date, UUID}
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters
 
@@ -508,7 +508,7 @@ class TestHoodieSparkSqlWriter {
     val records = DataSourceTestUtils.generateRandomRows(100)
     val recordsSeq = convertRowListToSeq(records)
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
-    initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
+    initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true)
     val client = spy(DataSourceUtils.createHoodieClient(
       new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
       mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -565,7 +565,7 @@ class TestHoodieSparkSqlWriter {
         DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
         HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
       val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
-      initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)
+      initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true, initBasePath = false)
 
       val client = spy(DataSourceUtils.createHoodieClient(
         new JavaSparkContext(sc),
@@ -593,7 +593,7 @@ class TestHoodieSparkSqlWriter {
     }
   }
 
-  def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean) : Unit = {
+  def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean, initBasePath: Boolean) : Unit = {
     // when metadata is enabled, directly instantiating write client using DataSourceUtils.createHoodieClient
     // will hit a code which tries to instantiate meta client for data table. if table does not exist, it fails.
     // hence doing an explicit instantiation here.
@@ -612,7 +612,9 @@ class TestHoodieSparkSqlWriter {
         tableMetaClientBuilder
           .setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
       }
-    tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
+    if (initBasePath) {
+      tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
+    }
   }
 
   /**
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index f0dd89df1c..23b21b315f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
-import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.functions.{col, lit}
@@ -31,9 +31,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.io.TempDir
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
 import java.time.Instant
 import java.util.Collections
-
 import scala.collection.JavaConverters._
 
 class TestDataSourceForBootstrap {
@@ -102,9 +102,12 @@ class TestDataSourceForBootstrap {
       .save(srcPath)
 
     // Perform bootstrap
+    val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
+    val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-      extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+      extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
+      bootstrapKeygenClass = bootstrapKeygenClass
     )
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -123,10 +126,10 @@ class TestDataSourceForBootstrap {
 
     updateDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(options)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, bootstrapKeygenClass)
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -163,8 +166,8 @@ class TestDataSourceForBootstrap {
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-      Some("datestr"),
-      Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
+      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
+      classOf[SimpleKeyGenerator].getName)
 
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -227,7 +230,9 @@ class TestDataSourceForBootstrap {
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
-      DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
+      DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+      classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count using glob path
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -302,7 +307,9 @@ class TestDataSourceForBootstrap {
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
-      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
+      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+      classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi")
@@ -367,7 +374,9 @@ class TestDataSourceForBootstrap {
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
-      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
+      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+      classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi")
@@ -497,18 +506,16 @@ class TestDataSourceForBootstrap {
   }
 
   def runMetadataBootstrapAndVerifyCommit(tableType: String,
-                                          partitionColumns: Option[String] = None,
-                                          extraOpts: Map[String, String] = Map.empty): String = {
+                                          extraOpts: Map[String, String] = Map.empty,
+                                          bootstrapKeygenClass: String): String = {
     val bootstrapDF = spark.emptyDataFrame
     bootstrapDF.write
       .format("hudi")
-      .options(commonOpts)
       .options(extraOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
       .option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
-      .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
+      .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, bootstrapKeygenClass)
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
@@ -528,7 +535,7 @@ class TestDataSourceForBootstrap {
       .load(basePath)
 
     assertEquals(numRecords, hoodieIncViewDF1.count())
-    var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
+    var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
     assertEquals(1, countsPerCommit.length)
     assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
 
@@ -537,10 +544,10 @@ class TestDataSourceForBootstrap {
     val hoodieIncViewDF2 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, bootstrapCommitInstantTime)
-      .load(basePath);
+      .load(basePath)
 
     assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
-    countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
+    countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
     assertEquals(1, countsPerCommit.length)
     assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))