You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:46 UTC

[hudi] 19/20: add DropPartitionsProcedure

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7588b918de23f12e098fdde6acac97dfd5c559c2
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Fri Feb 10 15:22:23 2023 +0800

    add DropPartitionsProcedure
---
 .../procedures/BackupInvalidParquetProcedure.scala |   2 +-
 .../procedures/DropPartitionsProcedure.scala       | 118 +++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../procedure/TestDropPartitionsProcedure.scala    |  67 ++++++++++++
 4 files changed, 187 insertions(+), 1 deletion(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
index f7b50bdc3d4..e0963baa471 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
 
 import java.util.function.Supplier
 import scala.collection.JavaConversions.asScalaBuffer
-import scala.jdk.CollectionConverters.seqAsJavaListConverter
+import scala.collection.JavaConverters.seqAsJavaListConverter
 
 class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala
new file mode 100644
index 00000000000..b194305e6c5
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr, splitPartitionAndDataPredicates}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.types._
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+
+class DropPartitionsProcedure extends BaseProcedure
+  with ProcedureBuilder
+  with PredicateHelper
+  with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
+    ProcedureParameter.optional(3, "selected_partitions", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty),
+    StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val predicate = getArgValueOrDefault(args, PARAMETERS(2))
+    val parts = getArgValueOrDefault(args, PARAMETERS(3))
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val selectedPartitions: String = (parts, predicate) match {
+      case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
+      case (Some(o), _) => o.asInstanceOf[String]
+      case _ => ""
+    }
+
+    val rows: java.util.List[Row] = new java.util.ArrayList[Row]()
+    var partitionPaths: java.util.List[String] = new java.util.ArrayList[String]()
+    if (selectedPartitions.nonEmpty) {
+      partitionPaths = selectedPartitions.split(",").toList.asJava
+      logInfo(s"Drop partitions : $selectedPartitions")
+    } else {
+      logInfo("No partition to drop")
+    }
+
+    partitionPaths.asScala.foreach(part => {
+      val dropSql = s"ALTER TABLE ${metaClient.getTableConfig.getTableName} DROP PARTITION ($part)"
+      logInfo(s"dropSql: $dropSql")
+      spark.sql(dropSql)
+      rows.add(Row(true, part))
+    })
+
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new DropPartitionsProcedure()
+
+  def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = {
+    val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath)
+    val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options,
+      FileStatusCache.getOrCreate(sparkSession))
+
+    // Resolve partition predicates
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema)
+    val condition = resolveExpr(sparkSession, predicate, tableSchema)
+    val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
+    val (partitionPredicates, dataPredicates) = splitPartitionAndDataPredicates(
+      sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns)
+    checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed")
+
+    // Get all partitions and prune partition by predicates
+    val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates)
+    prunedPartitions.map(path => path.getPath.replaceAll("/", ",")).toSet.mkString(",")
+  }
+}
+
+object DropPartitionsProcedure {
+  val NAME = "drop_partitions"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new DropPartitionsProcedure
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 5d945ecbfdb..f54db97b227 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -91,6 +91,7 @@ object HoodieProcedures {
       ,(HelpProcedure.NAME, HelpProcedure.builder)
       ,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder)
       ,(DeleteFsFileProcedure.NAME, DeleteFsFileProcedure.builder)
+      ,(DropPartitionsProcedure.NAME, DropPartitionsProcedure.builder)
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala
new file mode 100644
index 00000000000..a6fb2be45ee
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestDropPartitionsProcedure extends HoodieSparkProcedureTestBase {
+
+  test("Test Call drop_partitions Procedure With single-partition Pruning") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+
+        // Test partition pruning with single predicate
+        var resultA: Array[Seq[Any]] = Array.empty
+
+        {
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+          spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+          spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+          checkException(
+            s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L and id = 10')"
+          )("Only partition predicates are allowed")
+
+          // Do table drop partitions with partition predicate
+          resultA = spark.sql(s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L')")
+            .collect()
+            .map(row => Seq(row.getBoolean(0), row.getString(1)))
+          assertResult(2)(resultA.length)
+        }
+      }
+    }
+  }
+}