You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/08/14 06:53:53 UTC
[hudi] branch master updated: [HUDI-2307] When using
delete_partition with ds should not rely on the primary key (#3469)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b7da6cb [HUDI-2307] When using delete_partition with ds should not rely on the primary key (#3469)
b7da6cb is described below
commit b7da6cb33d27002525e40913dd63f077aeba26f0
Author: liujinhui <96...@qq.com>
AuthorDate: Sat Aug 14 14:53:39 2021 +0800
[HUDI-2307] When using delete_partition with ds should not rely on the primary key (#3469)
- Co-authored-by: Sivabalan Narayanan <n....@gmail.com>
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 5 +
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 11 +-
.../apache/hudi/HoodieSparkSqlWriterSuite.scala | 115 +++++++++++----------
style/scalastyle.xml | 2 +-
4 files changed, 75 insertions(+), 58 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index a534ac5..36c0493 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -325,6 +325,11 @@ object DataSourceWriteOptions {
@Deprecated
val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key()
+ val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.write.partitions.to.delete")
+ .noDefaultValue()
+ .withDocumentation("Comma separated list of partitions to delete")
+
val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.retry.count")
.defaultValue("3")
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f41df94..2c8d33e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -29,8 +29,8 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
+import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS}
@@ -192,7 +192,12 @@ object HoodieSparkSqlWriter {
}
// Get list of partitions to delete
- val partitionsToDelete = genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
+ val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
+ val partitionColsToDelete = parameters.get(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).get.split(",")
+ java.util.Arrays.asList(partitionColsToDelete:_*)
+ } else {
+ genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
+ }
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path.get, tblName,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
index e767f8a..1ff30e2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -679,61 +679,68 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
}
- test("test delete partitions") {
- initSparkContext("test_delete_partitions")
- val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions")
- try {
- val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions"
- val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
- val schema = DataSourceTestUtils.getStructTypeExampleSchema
- val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
- val records = DataSourceTestUtils.generateRandomRows(10)
- val recordsSeq = convertRowListToSeq(records)
- val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
- // write to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
+ List(true, false)
+ .foreach(usePartitionsToDeleteConfig => {
+ test("test delete partitions for " + usePartitionsToDeleteConfig) {
+ initSparkContext("test_delete_partitions_" + usePartitionsToDeleteConfig)
+ val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions")
+ try {
+ val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions"
+ val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
+ var fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val records = DataSourceTestUtils.generateRandomRows(10)
+ val recordsSeq = convertRowListToSeq(records)
+ val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
+ // write to Hudi
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
- val snapshotDF1 = spark.read.format("org.apache.hudi")
- .load(path.toAbsolutePath.toString + "/*/*/*/*")
- assertEquals(10, snapshotDF1.count())
- // remove metadata columns so that expected and actual DFs can be compared as is
- val trimmedDf1 = dropMetaFields(snapshotDF1)
- assert(df1.except(trimmedDf1).count() == 0)
-
- // issue updates so that log files are created for MOR table
- var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
- var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
- // write updates to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
- val snapshotDF2 = spark.read.format("org.apache.hudi")
- .load(path.toAbsolutePath.toString + "/*/*/*/*")
- assertEquals(10, snapshotDF2.count())
+ val snapshotDF1 = spark.read.format("org.apache.hudi")
+ .load(path.toAbsolutePath.toString + "/*/*/*/*")
+ assertEquals(10, snapshotDF1.count())
+ // remove metadata columns so that expected and actual DFs can be compared as is
+ val trimmedDf1 = dropMetaFields(snapshotDF1)
+ assert(df1.except(trimmedDf1).count() == 0)
- // remove metadata columns so that expected and actual DFs can be compared as is
- val trimmedDf2 = dropMetaFields(snapshotDF2)
- // ensure 2nd batch of updates matches.
- assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
-
- // delete partitions
- val recordsToDelete = df1.filter(entry => {
- val partitionPath : String = entry.getString(1)
- partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
- })
- val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
-
- val snapshotDF3 = spark.read.format("org.apache.hudi")
- .load(path.toAbsolutePath.toString + "/*/*/*/*")
- assertEquals(0, snapshotDF3.filter(entry => {
- val partitionPath = entry.getString(3)
- !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
- }).count())
- } finally {
- spark.stop()
- FileUtils.deleteDirectory(path.toFile)
- }
- }
+ // issue updates so that log files are created for MOR table
+ var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
+ var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
+ // write updates to Hudi
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
+ val snapshotDF2 = spark.read.format("org.apache.hudi")
+ .load(path.toAbsolutePath.toString + "/*/*/*/*")
+ assertEquals(10, snapshotDF2.count())
+
+ // remove metadata columns so that expected and actual DFs can be compared as is
+ val trimmedDf2 = dropMetaFields(snapshotDF2)
+ // ensure 2nd batch of updates matches.
+ assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
+
+ if ( usePartitionsToDeleteConfig) {
+ fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ }
+ // delete partitions contains the primary key
+ val recordsToDelete = df1.filter(entry => {
+ val partitionPath : String = entry.getString(1)
+ partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) ||
+ partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
+ })
+ val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
+
+ val snapshotDF3 = spark.read.format("org.apache.hudi")
+ .load(path.toAbsolutePath.toString + "/*/*/*/*")
+ assertEquals(0, snapshotDF3.filter(entry => {
+ val partitionPath = entry.getString(3)
+ !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
+ }).count())
+ } finally {
+ spark.stop()
+ FileUtils.deleteDirectory(path.toFile)
+ }
+ }
+ })
def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = {
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
diff --git a/style/scalastyle.xml b/style/scalastyle.xml
index fa2c4d3..89306f3 100644
--- a/style/scalastyle.xml
+++ b/style/scalastyle.xml
@@ -27,7 +27,7 @@
<check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"/>
<check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters>
- <parameter name="maxFileLength"><![CDATA[800]]></parameter>
+ <parameter name="maxFileLength"><![CDATA[900]]></parameter>
</parameters>
</check>
<check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>