You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/18 17:27:33 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #7668: [HUDI-2681] Make hoodie record_key and preCombine_key optional

nsivabalan commented on code in PR #7668:
URL: https://github.com/apache/hudi/pull/7668#discussion_r1073828099


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -77,6 +77,28 @@ object HoodieSparkSqlWriter {
   private var asyncCompactionTriggerFnDefined: Boolean = false
   private var asyncClusteringTriggerFnDefined: Boolean = false
 
+  def changeOperationToInsertIfRequired(writeOperationType: WriteOperationType, hoodieConfig: HoodieConfig)

Review Comment:
   handleConfigsForAutoGenerationOfRecordKeys



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##########
@@ -159,7 +164,15 @@ object HoodieWriterUtils {
           diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
         }
 
-        val datasourcePartitionFields = params.getOrElse(PARTITIONPATH_FIELD.key(), null)
+        val typedProperties = new TypedProperties()
+        typedProperties.putAll(params)
+        val keyGeneratorClass = HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(typedProperties)
+        val datasourcePartitionFields = if (keyGeneratorClass == classOf[KeylessKeyGenerator].getName) {
+          val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties)
+          SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(params))

Review Comment:
   lets revisit this after 5574. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala:
##########
@@ -1179,6 +1177,88 @@ class TestHoodieSparkSqlWriter {
     assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testAutoGenerationOfRecordKeysWithCombineBeforeInsert(tableType: String): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    val initialOpts = Map(
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> "org.apache.hudi.keygen.KeylessKeyGenerator",
+      DataSourceWriteOptions.OPERATION.key() -> INSERT_OPERATION_OPT_VAL,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition:simple",
+      HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "false")
+
+    try {
+      // verify exception is thrown when HoodieWriteConfig.COMBINE_BEFORE_INSERT is enabled
+      val tmpDF = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
+      tmpDF.write.format("org.apache.hudi")
+        .options(initialOpts + (HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "true"))
+        .mode(SaveMode.Append)
+        .save(tempBasePath)
+    } catch {
+      case e: HoodieException =>
+        assertTrue(e.getMessage.contains(s"${classOf[KeylessKeyGenerator].getName} can not be used with config enabled " +
+          s"{${HoodieWriteConfig.COMBINE_BEFORE_INSERT.key()}}"), e.getMessage)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testAutoGenerationOfRecordKeys(tableType: String): Unit = {
+
+    val dataGen = new HoodieTestDataGenerator()
+    val initialOpts = Map(
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> "org.apache.hudi.keygen.KeylessKeyGenerator",
+      DataSourceWriteOptions.OPERATION.key() -> UPSERT_OPERATION_OPT_VAL,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition:simple",
+      HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "false")
+
+    var totalRecs = 0
+    val extraOptsList = List(
+      Map.empty,
+      Map(
+        DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key")
+    )
+    for (extraOpt <- extraOptsList) {
+      val opts = initialOpts ++ extraOpt
+      for (x <- 1 to 5) {
+        val instantTime = "00" + x
+        val genRecsList = if (x == 1) {
+          totalRecs += 100 * 2
+          val inserts = dataGen.generateInserts(instantTime, 100)
+          inserts.addAll(inserts)
+          inserts
+        } else {
+          totalRecs += 10
+          dataGen.generateUniqueUpdates(instantTime, 10)
+        }
+        val records = recordsToStrings(genRecsList).toList
+        val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+        inputDF.write.format("org.apache.hudi")
+          .options(opts)
+          .mode(SaveMode.Append)
+          .save(tempBasePath)
+
+        val snapshotDF = spark.read.format("org.apache.hudi")
+          .load(tempBasePath)
+        assertEquals(totalRecs, snapshotDF.count())
+      }
+    }
+
+    val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tempBasePath).build()
+    assertEquals(metaClient.getTableConfig.getTableType, HoodieTableType.COPY_ON_WRITE)

Review Comment:
   can we write tests for exception/non happy paths. 
   1. if combine.before.insert is set, and try to send duplicates records, all duplicates should be present in snapshot read. 
   2. verify that preCombine field is not set in tableConfig. ensure tests sets it explicitly.
   3. if record key field config is set, it should not throw if keyless is enabled. and try to inject records w/ same record keys. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org