You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:17 UTC

[hudi] 07/45: add 'backup_invalid_parquet' procedure

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1d029e668bde07f764d3781d51b6c18c6fc025e1
Author: jiimmyzhan <ji...@tencent.com>
AuthorDate: Wed Aug 24 22:27:11 2022 +0800

    add 'backup_invalid_parquet' procedure
---
 .../procedures/BackupInvalidParquetProcedure.scala | 89 ++++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |  1 +
 .../TestBackupInvalidParquetProcedure.scala        | 83 ++++++++++++++++++++
 3 files changed, 173 insertions(+)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
new file mode 100644
index 0000000000..fbbb1247fa
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util.function.Supplier
+
+class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("backup_path", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("invalid_parquet_size", DataTypes.LongType, nullable = true, Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val backupPath = new Path(srcPath, ".backup").toString
+    val fs = FSUtils.getFs(backupPath, jsc.hadoopConfiguration())
+    fs.mkdirs(new Path(backupPath))
+
+    val partitionPaths: java.util.List[String] = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
+    val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, partitionPaths.size())
+    val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration())
+    val invalidParquetCount = javaRdd.rdd.map(part => {
+      val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get())
+      FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath, part))
+    }).flatMap(_.toList)
+      .filter(status => {
+        val filePath = status.getPath
+        var isInvalid = false
+        if (filePath.toString.endsWith(".parquet")) {
+          try ParquetFileReader.readFooter(serHadoopConf.get(), filePath, SKIP_ROW_GROUPS).getFileMetaData catch {
+            case e: Exception =>
+              isInvalid = e.getMessage.contains("is not a Parquet file")
+              filePath.getFileSystem(serHadoopConf.get()).rename(filePath, new Path(backupPath, filePath.getName))
+          }
+        }
+        isInvalid
+      })
+      .count()
+    Seq(Row(backupPath, invalidParquetCount))
+  }
+
+  override def build = new BackupInvalidParquetProcedure()
+}
+
+object BackupInvalidParquetProcedure {
+  val NAME = "backup_invalid_parquet"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get(): ProcedureBuilder = new BackupInvalidParquetProcedure()
+  }
+}
+
+
+
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b2bbec8489..0917c2b70e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -80,6 +80,7 @@ object HoodieProcedures {
       ,(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
       ,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
       ,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder)
+      ,(BackupInvalidParquetProcedure.NAME, BackupInvalidParquetProcedure.builder)
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
new file mode 100644
index 0000000000..2e54f40fb3
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.fs.FSUtils
+
+class TestBackupInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
+  test("Test Call backup_invalid_parquet Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (ts)
+           | location '$basePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // Check required fields
+      checkExceptionContain(s"""call backup_invalid_parquet(limit => 10)""")(
+        s"Argument: path is required")
+
+      val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+      val invalidPath1 = new Path(basePath, "ts=1000/1.parquet")
+      val out1 = fs.create(invalidPath1)
+      out1.write(1)
+      out1.close()
+
+      val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
+      val out2 = fs.create(invalidPath2)
+      out2.write(1)
+      out2.close()
+
+      val result1 = spark.sql(
+        s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect()
+      assertResult(2) {
+        result1.length
+      }
+
+      val result2 = spark.sql(
+        s"""call backup_invalid_parquet(path => '$basePath')""".stripMargin).collect()
+      assertResult(1) {
+        result2.length
+      }
+
+      val result3 = spark.sql(
+        s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect()
+      assertResult(0) {
+        result3.length
+      }
+
+    }
+  }
+}