You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by we...@apache.org on 2021/08/27 23:29:17 UTC
[hudi] branch master updated: [HUDI-2359] Add basic
"hoodie_is_deleted" unit tests to TestDataSource classes
This is an automated email from the ASF dual-hosted git repository.
wenningd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6943004 [HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes
new 69cbcc9 Merge pull request #3541 from rahil-c/rahil-c/HUDI-2359
6943004 is described below
commit 694300477f61a9169c15cac1ddf67368dbf5dd1b
Author: Rahil Chertara <rc...@amazon.com>
AuthorDate: Sun Aug 22 21:55:11 2021 -0700
[HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes
---
.../apache/hudi/functional/TestCOWDataSource.scala | 27 ++++++++++++++++
.../apache/hudi/functional/TestMORDataSource.scala | 37 ++++++++++++++++++++++
2 files changed, 64 insertions(+)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index cbd05eb..efc1430 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -782,6 +782,33 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn)
}
+ @Test
+ def testHoodieIsDeletedCOW(): Unit = {
+ val numRecords = 100
+ val numRecordsToDelete = 2
+ val records0 = recordsToStrings(dataGen.generateInserts("000", numRecords)).toList
+ val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
+ df0.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotDF0 = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*/*/*")
+ assertEquals(numRecords, snapshotDF0.count())
+
+ val df1 = snapshotDF0.limit(numRecordsToDelete)
+ val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
+ val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
+ df2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val snapshotDF2 = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*/*/*")
+ assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
+ }
+
def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = {
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 1bd1c93..f9409e0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -33,6 +33,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.BooleanType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -783,4 +784,40 @@ class TestMORDataSource extends HoodieClientTestBase {
val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
assertEquals(true, fs.listStatus(tempPath).isEmpty)
}
+
+ @Test
+ def testHoodieIsDeletedMOR(): Unit = {
+ val numRecords = 100
+ val numRecordsToDelete = 2
+ val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
+ val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).toList
+ val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
+ inputDF0.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "false")
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotDF0 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(numRecords, snapshotDF0.count())
+
+ val df1 = snapshotDF0.limit(numRecordsToDelete)
+ val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
+
+ val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
+ df2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val snapshotDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
+ }
}