You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:55 UTC

[hudi] 17/17: add DeleteFsFileProcedure

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2a3b0b5af8ac03d0b5940ca1d77bf9b71f9040eb
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Mon Jan 30 12:44:47 2023 +0800

    add DeleteFsFileProcedure
---
 .../command/procedures/DeleteFsFileProcedure.scala | 80 ++++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |  1 +
 .../hudi/procedure/TestDeleteFsFileProcedure.scala | 47 +++++++++++++
 3 files changed, 128 insertions(+)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala
new file mode 100644
index 00000000000..35f43ff1733
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util.function.Supplier
+
+class DeleteFsFileProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty),
+    StructField("file", DataTypes.StringType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val path = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val delFilePath: Path = new Path(path)
+
+    val fs = FSUtils.getFs(delFilePath, jsc.hadoopConfiguration())
+    val status: Array[FileStatus] = fs.globStatus(delFilePath)
+    val rows: java.util.List[Row] = new java.util.ArrayList[Row]()
+
+    if (status.nonEmpty) {
+      for (i <- status.indices) {
+        var result = false
+
+        try {
+          result = fs.delete(status(i).getPath, true)
+        } catch {
+          case e: Exception => System.err.println(s"delete ${status(i).getPath} failed due to", e)
+        }
+
+        rows.add(Row(result, status(i).getPath.toString))
+      }
+    }
+
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new DeleteFsFileProcedure()
+}
+
+object DeleteFsFileProcedure {
+  val NAME = "delete_fs_file"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get(): ProcedureBuilder = new DeleteFsFileProcedure()
+  }
+}
+
+
+
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index c4a8ce81f2d..5d945ecbfdb 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -90,6 +90,7 @@ object HoodieProcedures {
       ,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder)
       ,(HelpProcedure.NAME, HelpProcedure.builder)
       ,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder)
+      ,(DeleteFsFileProcedure.NAME, DeleteFsFileProcedure.builder)
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala
new file mode 100644
index 00000000000..8834e21a259
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestDeleteFsFileProcedure extends HoodieSparkProcedureTestBase {
+  test("Test Call delete_fs_file Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      val result = spark.sql(s"""call delete_fs_file(path => '$tablePath')""").collect()
+      assertResult(true) {
+        result.length > 0
+      }
+    }
+  }
+}