You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2023/03/09 00:38:58 UTC
[spark] branch branch-3.4 updated: [SPARK-42480][SQL] Improve the performance of drop partitions
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 5eb4edf53c4 [SPARK-42480][SQL] Improve the performance of drop partitions
5eb4edf53c4 is described below
commit 5eb4edf53c4bdda9d9ce8847048d195fb2b5c6a2
Author: wecharyu <yu...@gmail.com>
AuthorDate: Wed Mar 8 16:30:01 2023 -0800
[SPARK-42480][SQL] Improve the performance of drop partitions
### What changes were proposed in this pull request?
1. Change to get matching partition names rather than partition objects when drop partitions
### Why are the changes needed?
1. Partition names are enough to drop partitions
2. It can reduce the time overhead and driver memory overhead.
### Does this PR introduce _any_ user-facing change?
Yes, we have add a new sql conf to enable this feature: `spark.sql.hive.dropPartitionByName.enabled`
### How was this patch tested?
Add new tests.
Closes #40069 from wecharyu/SPARK-42480.
Authored-by: wecharyu <yu...@gmail.com>
Signed-off-by: Chao Sun <su...@apple.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 10 +++++++++
.../command/AlterTableDropPartitionSuiteBase.scala | 18 ++++++++++++++++
.../org/apache/spark/sql/hive/HiveUtils.scala | 13 +++++++++++
.../spark/sql/hive/client/HiveClientImpl.scala | 21 ++++++++++++------
.../command/AlterTableDropPartitionSuite.scala | 25 ++++++++++++++++++++++
5 files changed, 81 insertions(+), 6 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 21bf571c494..5a93fdde304 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1227,6 +1227,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val HIVE_METASTORE_DROP_PARTITION_BY_NAME =
+ buildConf("spark.sql.hive.dropPartitionByName.enabled")
+ .doc("When true, Spark will get partition name rather than partition object " +
+ "to drop partition, which can improve the performance of drop partition.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(false)
+
val HIVE_METASTORE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
@@ -4472,6 +4480,8 @@ class SQLConf extends Serializable with Logging {
def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
+ def metastoreDropPartitionsByName: Boolean = getConf(HIVE_METASTORE_DROP_PARTITION_BY_NAME)
+
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
def metastorePartitionPruningInSetThreshold: Int =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index 3f15533ca5f..eaf305414f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -284,4 +284,22 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
}
}
}
+
+ test("SPARK-42480: drop partition when dropPartitionByName enabled") {
+ withSQLConf(SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
+ withNamespaceAndTable("ns", "tbl") { t =>
+ sql(s"CREATE TABLE $t(name STRING, age INT) USING PARQUET PARTITIONED BY (region STRING)")
+ sql(s"ALTER TABLE $t ADD PARTITION (region='=reg1') LOCATION 'loc1'")
+ checkPartitions(t, Map("region" -> "=reg1"))
+ sql(s"ALTER TABLE $t PARTITION (region='=reg1') RENAME TO PARTITION (region='=%reg1')")
+ checkPartitions(t, Map("region" -> "=%reg1"))
+ sql(s"ALTER TABLE $t DROP PARTITION (region='=%reg1')")
+ checkPartitions(t)
+ sql(s"ALTER TABLE $t ADD PARTITION (region='reg?2') LOCATION 'loc2'")
+ checkPartitions(t, Map("region" -> "reg?2"))
+ sql(s"ALTER TABLE $t DROP PARTITION (region='reg?2')")
+ checkPartitions(t)
+ }
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index fe9bdef3d0e..0b59011f4e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -28,6 +28,8 @@ import scala.util.Try
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
@@ -50,6 +52,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
private[spark] object HiveUtils extends Logging {
+ private val PATTERN_FOR_KEY_EQ_VAL = "(.+)=(.+)".r
/** The version of hive used internally by Spark SQL. */
val builtinHiveVersion: String = HiveVersionInfo.getVersion
@@ -591,4 +594,14 @@ private[spark] object HiveUtils extends Logging {
table.copy(schema = StructType((dataCols ++ partCols).toArray))
}
}
+
+ /**
+ * Extract the partition values from a partition name, e.g., if a partition name is
+ * "region=US/dt=2023-02-18", then we will return an array of values ("US", "2023-02-18").
+ */
+ def partitionNameToValues(name: String): Array[String] = {
+ name.split(Path.SEPARATOR).map {
+ case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 0a83ec2689c..aaa0afc344d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -61,7 +61,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -679,7 +679,6 @@ private[hive] class HiveClientImpl(
purge: Boolean,
retainData: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
- val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
val matchingParts =
specs.flatMap { s =>
@@ -687,11 +686,21 @@ private[hive] class HiveClientImpl(
// The provided spec here can be a partial spec, i.e. it will match all partitions
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
- val parts = shim.getPartitions(client, hiveTable, s.asJava)
- if (parts.isEmpty && !ignoreIfNotExists) {
- throw new NoSuchPartitionsException(db, table, Seq(s))
+ val dropPartitionByName = SQLConf.get.metastoreDropPartitionsByName
+ if (dropPartitionByName) {
+ val partitionNames = shim.getPartitionNames(client, db, table, s.asJava, -1)
+ if (partitionNames.isEmpty && !ignoreIfNotExists) {
+ throw new NoSuchPartitionsException(db, table, Seq(s))
+ }
+ partitionNames.map(HiveUtils.partitionNameToValues(_).toList.asJava)
+ } else {
+ val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
+ val parts = shim.getPartitions(client, hiveTable, s.asJava)
+ if (parts.isEmpty && !ignoreIfNotExists) {
+ throw new NoSuchPartitionsException(db, table, Seq(s))
+ }
+ parts.map(_.getValues)
}
- parts.map(_.getValues)
}.distinct
val droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala
index de995c120e6..05033260351 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala
@@ -50,4 +50,29 @@ class AlterTableDropPartitionSuite
}
}
}
+
+ test("SPARK-42480: hive client calls when dropPartitionByName enabled") {
+ Seq(false, true).foreach { statsOn =>
+ withSQLConf(
+ SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> statsOn.toString,
+ SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
+ withNamespaceAndTable("ns", "tbl") { t =>
+ sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)")
+ sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
+ sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
+ sql(s"ALTER TABLE $t ADD PARTITION (part=2)") // empty partition
+ checkHiveClientCalls(expected = if (statsOn) 25 else 17) {
+ sql(s"ALTER TABLE $t DROP PARTITION (part=2)")
+ }
+ checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
+ sql(s"ALTER TABLE $t DROP PARTITION (part=0)")
+ }
+ sql(s"CACHE TABLE $t")
+ checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
+ sql(s"ALTER TABLE $t DROP PARTITION (part=1)")
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org