You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:30 UTC

[hudi] 03/20: improve BackupInvalidParquetProcedure

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9a9fd985f2f7d251af848b7034c5b4b1bc0ac8c9
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Sat Feb 4 15:03:35 2023 +0800

    improve BackupInvalidParquetProcedure
---
 .../procedures/BackupInvalidParquetProcedure.scala | 23 +++++++++++++++++-----
 .../TestBackupInvalidParquetProcedure.scala        | 19 +++++++++++-------
 2 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
index 5c1234b7a27..f7b50bdc3d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -31,10 +31,14 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
 
 import java.util.function.Supplier
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.jdk.CollectionConverters.seqAsJavaListConverter
 
 class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "is_partition", DataTypes.BooleanType, false),
+    ProcedureParameter.optional(2, "parallelism", DataTypes.IntegerType, 100)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -50,17 +54,26 @@ class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder
     super.checkArgs(PARAMETERS, args)
 
     val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val isPartition = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
+    val parallelism = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+
     val backupPath = new Path(srcPath, ".backup").toString
     val fs = FSUtils.getFs(backupPath, jsc.hadoopConfiguration())
     fs.mkdirs(new Path(backupPath))
 
-    val partitionPaths: java.util.List[String] = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
-    val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, partitionPaths.size())
     val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration())
+    val partitionPaths: java.util.List[Path] = if (isPartition) {
+      List(new Path(srcPath)).asJava
+    } else {
+      FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
+        .map(part => FSUtils.getPartitionPath(srcPath, part))
+        .toList.asJava
+    }
+    val javaRdd: JavaRDD[Path] = jsc.parallelize(partitionPaths, partitionPaths.size())
     val invalidParquetCount = javaRdd.rdd.map(part => {
       val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get())
-      FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath, part))
-    }).flatMap(_.toList)
+      FSUtils.getAllDataFilesInPartition(fs, part)
+    }).flatMap(_.toList).repartition(parallelism)
       .filter(status => {
         val filePath = status.getPath
         var isInvalid = false
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
index 2e54f40fb3f..e97574afc85 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
@@ -55,17 +55,23 @@ class TestBackupInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
       out1.write(1)
       out1.close()
 
-      val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
-      val out2 = fs.create(invalidPath2)
-      out2.write(1)
-      out2.close()
+      assertResult(1) {
+        spark.sql(
+          s"""call show_invalid_parquet(path => '$basePath')""".stripMargin)
+          .collect().length
+      }
 
       val result1 = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect()
-      assertResult(2) {
+        s"""call backup_invalid_parquet(path => '$basePath/ts=1500', is_partition => true)""".stripMargin).collect()
+      assertResult(1) {
         result1.length
       }
 
+      val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
+      val out2 = fs.create(invalidPath2)
+      out2.write(1)
+      out2.close()
+
       val result2 = spark.sql(
         s"""call backup_invalid_parquet(path => '$basePath')""".stripMargin).collect()
       assertResult(1) {
@@ -77,7 +83,6 @@ class TestBackupInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
       assertResult(0) {
         result3.length
       }
-
     }
   }
 }