You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:30 UTC
[hudi] 03/20: improve BackupInvalidParquetProcedure
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9a9fd985f2f7d251af848b7034c5b4b1bc0ac8c9
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Sat Feb 4 15:03:35 2023 +0800
improve BackupInvalidParquetProcedure
---
.../procedures/BackupInvalidParquetProcedure.scala | 23 +++++++++++++++++-----
.../TestBackupInvalidParquetProcedure.scala | 19 +++++++++++-------
2 files changed, 30 insertions(+), 12 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
index 5c1234b7a27..f7b50bdc3d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -31,10 +31,14 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.jdk.CollectionConverters.seqAsJavaListConverter
class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "path", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "is_partition", DataTypes.BooleanType, false),
+ ProcedureParameter.optional(2, "parallelism", DataTypes.IntegerType, 100)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -50,17 +54,26 @@ class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder
super.checkArgs(PARAMETERS, args)
val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+ val isPartition = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
+ val parallelism = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+
val backupPath = new Path(srcPath, ".backup").toString
val fs = FSUtils.getFs(backupPath, jsc.hadoopConfiguration())
fs.mkdirs(new Path(backupPath))
- val partitionPaths: java.util.List[String] = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
- val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, partitionPaths.size())
val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration())
+ val partitionPaths: java.util.List[Path] = if (isPartition) {
+ List(new Path(srcPath)).asJava
+ } else {
+ FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
+ .map(part => FSUtils.getPartitionPath(srcPath, part))
+ .toList.asJava
+ }
+ val javaRdd: JavaRDD[Path] = jsc.parallelize(partitionPaths, partitionPaths.size())
val invalidParquetCount = javaRdd.rdd.map(part => {
val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get())
- FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath, part))
- }).flatMap(_.toList)
+ FSUtils.getAllDataFilesInPartition(fs, part)
+ }).flatMap(_.toList).repartition(parallelism)
.filter(status => {
val filePath = status.getPath
var isInvalid = false
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
index 2e54f40fb3f..e97574afc85 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
@@ -55,17 +55,23 @@ class TestBackupInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
out1.write(1)
out1.close()
- val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
- val out2 = fs.create(invalidPath2)
- out2.write(1)
- out2.close()
+ assertResult(1) {
+ spark.sql(
+ s"""call show_invalid_parquet(path => '$basePath')""".stripMargin)
+ .collect().length
+ }
val result1 = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect()
- assertResult(2) {
+ s"""call backup_invalid_parquet(path => '$basePath/ts=1500', is_partition => true)""".stripMargin).collect()
+ assertResult(1) {
result1.length
}
+ val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
+ val out2 = fs.create(invalidPath2)
+ out2.write(1)
+ out2.close()
+
val result2 = spark.sql(
s"""call backup_invalid_parquet(path => '$basePath')""".stripMargin).collect()
assertResult(1) {
@@ -77,7 +83,6 @@ class TestBackupInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
assertResult(0) {
result3.length
}
-
}
}
}