You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2023/03/09 00:38:58 UTC

[spark] branch branch-3.4 updated: [SPARK-42480][SQL] Improve the performance of drop partitions

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 5eb4edf53c4 [SPARK-42480][SQL] Improve the performance of drop partitions
5eb4edf53c4 is described below

commit 5eb4edf53c4bdda9d9ce8847048d195fb2b5c6a2
Author: wecharyu <yu...@gmail.com>
AuthorDate: Wed Mar 8 16:30:01 2023 -0800

    [SPARK-42480][SQL] Improve the performance of drop partitions
    
    ### What changes were proposed in this pull request?
    1. Change to get matching partition names rather than partition objects when drop partitions
    
    ### Why are the changes needed?
    1. Partition names are enough to drop partitions
    2. It can reduce the time overhead and driver memory overhead.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, we have add a new sql conf to enable this feature: `spark.sql.hive.dropPartitionByName.enabled`
    
    ### How was this patch tested?
    Add new tests.
    
    Closes #40069 from wecharyu/SPARK-42480.
    
    Authored-by: wecharyu <yu...@gmail.com>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 +++++++++
 .../command/AlterTableDropPartitionSuiteBase.scala | 18 ++++++++++++++++
 .../org/apache/spark/sql/hive/HiveUtils.scala      | 13 +++++++++++
 .../spark/sql/hive/client/HiveClientImpl.scala     | 21 ++++++++++++------
 .../command/AlterTableDropPartitionSuite.scala     | 25 ++++++++++++++++++++++
 5 files changed, 81 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 21bf571c494..5a93fdde304 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1227,6 +1227,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val HIVE_METASTORE_DROP_PARTITION_BY_NAME =
+    buildConf("spark.sql.hive.dropPartitionByName.enabled")
+      .doc("When true, Spark will get partition name rather than partition object " +
+           "to drop partition, which can improve the performance of drop partition.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val HIVE_METASTORE_PARTITION_PRUNING =
     buildConf("spark.sql.hive.metastorePartitionPruning")
       .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
@@ -4472,6 +4480,8 @@ class SQLConf extends Serializable with Logging {
 
   def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
 
+  def metastoreDropPartitionsByName: Boolean = getConf(HIVE_METASTORE_DROP_PARTITION_BY_NAME)
+
   def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
   def metastorePartitionPruningInSetThreshold: Int =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index 3f15533ca5f..eaf305414f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -284,4 +284,22 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
       }
     }
   }
+
+  test("SPARK-42480: drop partition when dropPartitionByName enabled") {
+    withSQLConf(SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
+      withNamespaceAndTable("ns", "tbl") { t =>
+        sql(s"CREATE TABLE $t(name STRING, age INT) USING PARQUET PARTITIONED BY (region STRING)")
+        sql(s"ALTER TABLE $t ADD PARTITION (region='=reg1') LOCATION 'loc1'")
+        checkPartitions(t, Map("region" -> "=reg1"))
+        sql(s"ALTER TABLE $t PARTITION (region='=reg1') RENAME TO PARTITION (region='=%reg1')")
+        checkPartitions(t, Map("region" -> "=%reg1"))
+        sql(s"ALTER TABLE $t DROP PARTITION (region='=%reg1')")
+        checkPartitions(t)
+        sql(s"ALTER TABLE $t ADD PARTITION (region='reg?2') LOCATION 'loc2'")
+        checkPartitions(t, Map("region" -> "reg?2"))
+        sql(s"ALTER TABLE $t DROP PARTITION (region='reg?2')")
+        checkPartitions(t)
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index fe9bdef3d0e..0b59011f4e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -28,6 +28,8 @@ import scala.util.Try
 
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.session.SessionState
@@ -50,6 +52,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
 
 
 private[spark] object HiveUtils extends Logging {
+  private val PATTERN_FOR_KEY_EQ_VAL = "(.+)=(.+)".r
 
   /** The version of hive used internally by Spark SQL. */
   val builtinHiveVersion: String = HiveVersionInfo.getVersion
@@ -591,4 +594,14 @@ private[spark] object HiveUtils extends Logging {
       table.copy(schema = StructType((dataCols ++ partCols).toArray))
     }
   }
+
+  /**
+   * Extract the partition values from a partition name, e.g., if a partition name is
+   * "region=US/dt=2023-02-18", then we will return an array of values ("US", "2023-02-18").
+   */
+  def partitionNameToValues(name: String): Array[String] = {
+    name.split(Path.SEPARATOR).map {
+      case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
+    }
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 0a83ec2689c..aaa0afc344d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -61,7 +61,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -679,7 +679,6 @@ private[hive] class HiveClientImpl(
       purge: Boolean,
       retainData: Boolean): Unit = withHiveState {
     // TODO: figure out how to drop multiple partitions in one call
-    val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
     // do the check at first and collect all the matching partitions
     val matchingParts =
       specs.flatMap { s =>
@@ -687,11 +686,21 @@ private[hive] class HiveClientImpl(
         // The provided spec here can be a partial spec, i.e. it will match all partitions
         // whose specs are supersets of this partial spec. E.g. If a table has partitions
         // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
-        val parts = shim.getPartitions(client, hiveTable, s.asJava)
-        if (parts.isEmpty && !ignoreIfNotExists) {
-          throw new NoSuchPartitionsException(db, table, Seq(s))
+        val dropPartitionByName = SQLConf.get.metastoreDropPartitionsByName
+        if (dropPartitionByName) {
+          val partitionNames = shim.getPartitionNames(client, db, table, s.asJava, -1)
+          if (partitionNames.isEmpty && !ignoreIfNotExists) {
+            throw new NoSuchPartitionsException(db, table, Seq(s))
+          }
+          partitionNames.map(HiveUtils.partitionNameToValues(_).toList.asJava)
+        } else {
+          val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
+          val parts = shim.getPartitions(client, hiveTable, s.asJava)
+          if (parts.isEmpty && !ignoreIfNotExists) {
+            throw new NoSuchPartitionsException(db, table, Seq(s))
+          }
+          parts.map(_.getValues)
         }
-        parts.map(_.getValues)
       }.distinct
     val droppedParts = ArrayBuffer.empty[java.util.List[String]]
     matchingParts.foreach { partition =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala
index de995c120e6..05033260351 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala
@@ -50,4 +50,29 @@ class AlterTableDropPartitionSuite
       }
     }
   }
+
+  test("SPARK-42480: hive client calls when dropPartitionByName enabled") {
+    Seq(false, true).foreach { statsOn =>
+      withSQLConf(
+        SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> statsOn.toString,
+        SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
+        withNamespaceAndTable("ns", "tbl") { t =>
+          sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)")
+          sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
+          sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
+          sql(s"ALTER TABLE $t ADD PARTITION (part=2)") // empty partition
+          checkHiveClientCalls(expected = if (statsOn) 25 else 17) {
+            sql(s"ALTER TABLE $t DROP PARTITION (part=2)")
+          }
+          checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
+            sql(s"ALTER TABLE $t DROP PARTITION (part=0)")
+          }
+          sql(s"CACHE TABLE $t")
+          checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
+            sql(s"ALTER TABLE $t DROP PARTITION (part=1)")
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org