You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by we...@apache.org on 2021/08/27 23:29:17 UTC

[hudi] branch master updated: [HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes

This is an automated email from the ASF dual-hosted git repository.

wenningd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6943004  [HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes
     new 69cbcc9  Merge pull request #3541 from rahil-c/rahil-c/HUDI-2359
6943004 is described below

commit 694300477f61a9169c15cac1ddf67368dbf5dd1b
Author: Rahil Chertara <rc...@amazon.com>
AuthorDate: Sun Aug 22 21:55:11 2021 -0700

    [HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes
---
 .../apache/hudi/functional/TestCOWDataSource.scala | 27 ++++++++++++++++
 .../apache/hudi/functional/TestMORDataSource.scala | 37 ++++++++++++++++++++++
 2 files changed, 64 insertions(+)

diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index cbd05eb..efc1430 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -782,6 +782,33 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn)
   }
 
+  @Test
+  def testHoodieIsDeletedCOW(): Unit = {
+    val numRecords = 100
+    val numRecordsToDelete = 2
+    val records0 = recordsToStrings(dataGen.generateInserts("000", numRecords)).toList
+    val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
+    df0.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshotDF0 = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*/*/*")
+    assertEquals(numRecords, snapshotDF0.count())
+
+    val df1 = snapshotDF0.limit(numRecordsToDelete)
+    val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
+    val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
+    df2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val snapshotDF2 = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*/*/*")
+    assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
+  }
+
   def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = {
     val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 1bd1c93..f9409e0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -33,6 +33,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat
 import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.BooleanType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -783,4 +784,40 @@ class TestMORDataSource extends HoodieClientTestBase {
     val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
     assertEquals(true, fs.listStatus(tempPath).isEmpty)
   }
+
+  @Test
+  def testHoodieIsDeletedMOR(): Unit =  {
+    val numRecords = 100
+    val numRecordsToDelete = 2
+    val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
+    val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).toList
+    val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
+    inputDF0.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false")
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshotDF0 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(numRecords, snapshotDF0.count())
+
+    val df1 = snapshotDF0.limit(numRecordsToDelete)
+    val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
+
+    val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
+    df2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val snapshotDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
+  }
 }