You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:46 UTC
[hudi] 19/20: add DropPartitionsProcedure
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7588b918de23f12e098fdde6acac97dfd5c559c2
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Fri Feb 10 15:22:23 2023 +0800
add DropPartitionsProcedure
---
.../procedures/BackupInvalidParquetProcedure.scala | 2 +-
.../procedures/DropPartitionsProcedure.scala | 118 +++++++++++++++++++++
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
.../procedure/TestDropPartitionsProcedure.scala | 67 ++++++++++++
4 files changed, 187 insertions(+), 1 deletion(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
index f7b50bdc3d4..e0963baa471 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier
import scala.collection.JavaConversions.asScalaBuffer
-import scala.jdk.CollectionConverters.seqAsJavaListConverter
+import scala.collection.JavaConverters.seqAsJavaListConverter
class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala
new file mode 100644
index 00000000000..b194305e6c5
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr, splitPartitionAndDataPredicates}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.types._
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+
+class DropPartitionsProcedure extends BaseProcedure
+ with ProcedureBuilder
+ with PredicateHelper
+ with Logging {
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+ ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
+ ProcedureParameter.optional(3, "selected_partitions", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty),
+ StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val predicate = getArgValueOrDefault(args, PARAMETERS(2))
+ val parts = getArgValueOrDefault(args, PARAMETERS(3))
+
+ val basePath: String = getBasePath(tableName, tablePath)
+ val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val selectedPartitions: String = (parts, predicate) match {
+ case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
+ case (Some(o), _) => o.asInstanceOf[String]
+ case _ => ""
+ }
+
+ val rows: java.util.List[Row] = new java.util.ArrayList[Row]()
+ var partitionPaths: java.util.List[String] = new java.util.ArrayList[String]()
+ if (selectedPartitions.nonEmpty) {
+ partitionPaths = selectedPartitions.split(",").toList.asJava
+ logInfo(s"Drop partitions : $selectedPartitions")
+ } else {
+ logInfo("No partition to drop")
+ }
+
+ partitionPaths.asScala.foreach(part => {
+ val dropSql = s"ALTER TABLE ${metaClient.getTableConfig.getTableName} DROP PARTITION ($part)"
+ logInfo(s"dropSql: $dropSql")
+ spark.sql(dropSql)
+ rows.add(Row(true, part))
+ })
+
+ rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new DropPartitionsProcedure()
+
+ def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = {
+ val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath)
+ val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options,
+ FileStatusCache.getOrCreate(sparkSession))
+
+ // Resolve partition predicates
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema)
+ val condition = resolveExpr(sparkSession, predicate, tableSchema)
+ val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
+ val (partitionPredicates, dataPredicates) = splitPartitionAndDataPredicates(
+ sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns)
+ checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed")
+
+ // Get all partitions and prune partition by predicates
+ val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates)
+ prunedPartitions.map(path => path.getPath.replaceAll("/", ",")).toSet.mkString(",")
+ }
+}
+
+object DropPartitionsProcedure {
+ val NAME = "drop_partitions"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new DropPartitionsProcedure
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 5d945ecbfdb..f54db97b227 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -91,6 +91,7 @@ object HoodieProcedures {
,(HelpProcedure.NAME, HelpProcedure.builder)
,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder)
,(DeleteFsFileProcedure.NAME, DeleteFsFileProcedure.builder)
+ ,(DropPartitionsProcedure.NAME, DropPartitionsProcedure.builder)
)
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala
new file mode 100644
index 00000000000..a6fb2be45ee
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestDropPartitionsProcedure extends HoodieSparkProcedureTestBase {
+
+ test("Test Call drop_partitions Procedure With single-partition Pruning") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+
+ // Test partition pruning with single predicate
+ var resultA: Array[Seq[Any]] = Array.empty
+
+ {
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+ checkException(
+ s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L and id = 10')"
+ )("Only partition predicates are allowed")
+
+ // Do table drop partitions with partition predicate
+ resultA = spark.sql(s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L')")
+ .collect()
+ .map(row => Seq(row.getBoolean(0), row.getString(1)))
+ assertResult(2)(resultA.length)
+ }
+ }
+ }
+ }
+}