You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:55 UTC
[hudi] 17/17: add DeleteFsFileProcedure
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2a3b0b5af8ac03d0b5940ca1d77bf9b71f9040eb
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Mon Jan 30 12:44:47 2023 +0800
add DeleteFsFileProcedure
---
.../command/procedures/DeleteFsFileProcedure.scala | 80 ++++++++++++++++++++++
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
.../hudi/procedure/TestDeleteFsFileProcedure.scala | 47 +++++++++++++
3 files changed, 128 insertions(+)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala
new file mode 100644
index 00000000000..35f43ff1733
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util.function.Supplier
+
+class DeleteFsFileProcedure extends BaseProcedure with ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty),
+ StructField("file", DataTypes.StringType, nullable = true, Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val path = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+ val delFilePath: Path = new Path(path)
+
+ val fs = FSUtils.getFs(delFilePath, jsc.hadoopConfiguration())
+ val status: Array[FileStatus] = fs.globStatus(delFilePath)
+ val rows: java.util.List[Row] = new java.util.ArrayList[Row]()
+
+ if (status.nonEmpty) {
+ for (i <- status.indices) {
+ var result = false
+
+ try {
+ result = fs.delete(status(i).getPath, true)
+ } catch {
+ case e: Exception => System.err.println(s"delete ${status(i).getPath} failed due to", e)
+ }
+
+ rows.add(Row(result, status(i).getPath.toString))
+ }
+ }
+
+ rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new DeleteFsFileProcedure()
+}
+
+object DeleteFsFileProcedure {
+ val NAME = "delete_fs_file"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get(): ProcedureBuilder = new DeleteFsFileProcedure()
+ }
+}
+
+
+
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index c4a8ce81f2d..5d945ecbfdb 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -90,6 +90,7 @@ object HoodieProcedures {
,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder)
,(HelpProcedure.NAME, HelpProcedure.builder)
,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder)
+ ,(DeleteFsFileProcedure.NAME, DeleteFsFileProcedure.builder)
)
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala
new file mode 100644
index 00000000000..8834e21a259
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestDeleteFsFileProcedure extends HoodieSparkProcedureTestBase {
+ test("Test Call delete_fs_file Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ val result = spark.sql(s"""call delete_fs_file(path => '$tablePath')""").collect()
+ assertResult(true) {
+ result.length > 0
+ }
+ }
+ }
+}