You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/07 18:06:34 UTC

[hudi] 12/21: Adding tests to validate different key generators (#4473)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fb084e0d93c4217f668614eb06eed9bdcf009278
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Tue Jan 4 00:18:04 2022 -0500

    Adding tests to validate different key generators (#4473)
---
 .../hudi/functional/TestCOWDataSourceStorage.scala | 85 ++++++++++++++++------
 1 file changed, 61 insertions(+), 24 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index af2bc69..bf616e2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -26,25 +26,24 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
+import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, CsvSource, ValueSource}
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
-import java.util
-import java.util.Arrays
-import java.util.stream.Stream
 import scala.collection.JavaConversions._
 
 
 @Tag("functional")
 class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
 
-  val commonOpts = Map(
+  var commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
     "hoodie.bulkinsert.shuffle.parallelism" -> "2",
@@ -59,14 +58,26 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
   val updatedVerificationVal: String = "driver_update"
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testCopyOnWriteStorage(isMetadataEnabled: Boolean): Unit = {
+  @CsvSource(Array("true,org.apache.hudi.keygen.SimpleKeyGenerator", "true,org.apache.hudi.keygen.ComplexKeyGenerator",
+    "true,org.apache.hudi.keygen.TimestampBasedKeyGenerator", "false,org.apache.hudi.keygen.SimpleKeyGenerator",
+    "false,org.apache.hudi.keygen.ComplexKeyGenerator", "false,org.apache.hudi.keygen.TimestampBasedKeyGenerator"))
+  def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String): Unit = {
+    commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass
+    if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) {
+      commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key, pii_col"
+    }
+    if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
+      commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
+      commonOpts += DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "current_ts"
+      commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS"
+      commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
+    }
     val dataGen = new HoodieTestDataGenerator()
     val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
     // Insert Operation
-    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
-    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-    inputDF1.write.format("org.apache.hudi")
+    val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
+    inputDF0.write.format("org.apache.hudi")
       .options(commonOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
@@ -82,9 +93,18 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
       .load(basePath)
     assertEquals(100, snapshotDF1.count())
 
-    // Upsert based on the written table with Hudi metadata columns
-    val verificationRowKey = snapshotDF1.limit(1).select("_row_key").first.getString(0)
-    val updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
+    val records1 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    val verificationRowKey = inputDF1.limit(1).select("_row_key").first.getString(0)
+    var updateDf: DataFrame = null
+    if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
+      // update current_ts to be same as original record so that partition path does not change with timestamp based key gen
+      val orignalRow = inputDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
+      updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
+        .withColumn("current_ts", lit(orignalRow.getAs("current_ts")))
+    } else {
+      updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
+    }
 
     updateDf.write.format("org.apache.hudi")
       .options(commonOpts)
@@ -100,8 +120,26 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
     assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
 
     // Upsert Operation without Hudi metadata columns
-    val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
-    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2))
+    val records2 = recordsToStrings(dataGen.generateUpdates("002", 100)).toList
+    var inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+
+    if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
+      // incase of Timestamp based key gen, current_ts should not be updated. but dataGen.generateUpdates() would have updated
+      // the value of current_ts. So, we need to revert it back to original value.
+      // here is what we are going to do. Copy values to temp columns, join with original df and update the current_ts
+      // and drop the temp columns.
+
+      val inputDF2WithTempCols = inputDF2.withColumn("current_ts_temp", col("current_ts"))
+        .withColumn("_row_key_temp", col("_row_key"))
+      val originalRowCurrentTsDf = inputDF0.select("_row_key", "current_ts")
+      // join with original df
+      val joinedDf = inputDF2WithTempCols.drop("_row_key", "current_ts").join(originalRowCurrentTsDf, (inputDF2WithTempCols("_row_key_temp") === originalRowCurrentTsDf("_row_key")))
+      // copy values from temp back to original cols and drop temp cols
+      inputDF2 = joinedDf.withColumn("current_ts_temp", col("current_ts"))
+        .drop("current_ts", "_row_key_temp").withColumn("current_ts", col("current_ts_temp"))
+        .drop("current_ts_temp")
+    }
+
     val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
 
     inputDF2.write.format("org.apache.hudi")
@@ -136,12 +174,12 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
     val emptyIncDF = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000")
-      .option(DataSourceReadOptions.END_INSTANTTIME.key, "001")
+      .option(DataSourceReadOptions.END_INSTANTTIME.key, "002")
       .load(basePath)
     assertEquals(0, emptyIncDF.count())
 
     // Upsert an empty dataFrame
-    val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList
+    val emptyRecords = recordsToStrings(dataGen.generateUpdates("003", 0)).toList
     val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
     emptyDF.write.format("org.apache.hudi")
       .options(commonOpts)
@@ -195,7 +233,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
       .option("hoodie.keep.min.commits", "2")
       .option("hoodie.keep.max.commits", "3")
       .option("hoodie.cleaner.commits.retained", "1")
-      .option("hoodie.metadata.enable","false")
+      .option("hoodie.metadata.enable", "false")
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Overwrite)
       .save(basePath)
@@ -205,8 +243,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
     // issue delete partition to partition1
     writeRecords(2, dataGenPartition1, writeOperation, basePath)
 
-    val expectedRecCount = if (writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL))
-    {
+    val expectedRecCount = if (writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
       200 - partition1RecordCount
     } else {
       100 - partition1RecordCount
@@ -239,15 +276,15 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
       .option("hoodie.keep.min.commits", "2")
       .option("hoodie.keep.max.commits", "3")
       .option("hoodie.cleaner.commits.retained", "1")
-      .option("hoodie.metadata.enable","false")
+      .option("hoodie.metadata.enable", "false")
       .option(DataSourceWriteOptions.OPERATION.key, writeOperation)
       .mode(SaveMode.Append)
       .save(basePath)
   }
 
-  def assertRecordCount(basePath: String, expectedRecordCount: Long) : Unit = {
+  def assertRecordCount(basePath: String, expectedRecordCount: Long): Unit = {
     val snapshotDF = spark.read.format("org.apache.hudi")
-    .load(basePath + "/*/*/*/*")
+      .load(basePath + "/*/*/*/*")
     assertEquals(expectedRecordCount, snapshotDF.count())
   }
 }