You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/11 07:01:51 UTC
[spark] branch master updated: [SPARK-36876][SQL] Support Dynamic
Partition pruning for HiveTableScanExec
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 32e11ea [SPARK-36876][SQL] Support Dynamic Partition pruning for HiveTableScanExec
32e11ea is described below
commit 32e11ea1423ec25ed6a6b9aedaa7a9d26220244a
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Oct 11 15:00:51 2021 +0800
[SPARK-36876][SQL] Support Dynamic Partition pruning for HiveTableScanExec
### What changes were proposed in this pull request?
Current code just support dynamic partition pruning for DSV1 and DSV2, here we support HiveTableScan
### Why are the changes needed?
Optimize Hive Table Scan dynamic partition pruning
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Added UT
Closes #34139 from AngersZhuuuu/SPARK-36876.
Lead-authored-by: Angerszhuuuu <an...@gmail.com>
Co-authored-by: AngersZhuuuu <an...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../CleanupDynamicPruningFilters.scala | 3 +
.../dynamicpruning/PartitionPruning.scala | 11 ++-
.../spark/sql/DynamicPartitionPruningSuite.scala | 99 ++++++++++++----------
.../sql/hive/execution/HiveTableScanExec.scala | 10 ++-
.../DynamicPartitionPruningHiveScanSuite.scala | 54 ++++++++++++
5 files changed, 131 insertions(+), 46 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala
index bcaed52..abf0cf6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.dynamicpruning
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -43,6 +44,8 @@ object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelp
_.containsAnyPattern(DYNAMIC_PRUNING_EXPRESSION, DYNAMIC_PRUNING_SUBQUERY)) {
// pass through anything that is pushed down into PhysicalOperation
case p @ PhysicalOperation(_, _, LogicalRelation(_: HadoopFsRelation, _, _, _)) => p
+ // pass through anything that is pushed down into PhysicalOperation
+ case p @ PhysicalOperation(_, _, HiveTableRelation(_, _, _, _, _)) => p
case p @ PhysicalOperation(_, _, _: DataSourceV2ScanRelation) => p
// remove any Filters with DynamicPruning that didn't get pushed down to PhysicalOperation.
case f @ Filter(condition, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
index 1ca1978..4b5f724 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.dynamicpruning
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
@@ -53,8 +54,8 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
/**
* Searches for a table scan that can be filtered for a given column in a logical plan.
*
- * This methods tries to find either a v1 partitioned scan for a given partition column or
- * a v2 scan that support runtime filtering on a given attribute.
+ * This methods tries to find either a v1 or Hive serde partitioned scan for a given
+ * partition column or a v2 scan that support runtime filtering on a given attribute.
*/
def getFilterableTableScan(a: Expression, plan: LogicalPlan): Option[LogicalPlan] = {
val srcInfo: Option[(Expression, LogicalPlan)] = findExpressionAndTrackLineageDown(a, plan)
@@ -71,6 +72,12 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
}
case _ => None
}
+ case (resExp, l: HiveTableRelation) =>
+ if (resExp.references.subsetOf(AttributeSet(l.partitionCols))) {
+ return Some(l)
+ } else {
+ None
+ }
case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _)) =>
val filterAttrs = V2ExpressionUtils.resolveRefs[Attribute](scan.filterAttributes, r)
if (resExp.references.subsetOf(AttributeSet(filterAttrs))) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index dc84d6e..d7245c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -31,14 +31,14 @@ import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
/**
* Test suite for the filtering ratio policy used to trigger dynamic partition pruning (DPP).
*/
abstract class DynamicPartitionPruningSuiteBase
extends QueryTest
- with SharedSparkSession
+ with SQLTestUtils
with GivenWhenThen
with AdaptiveSparkPlanHelper {
@@ -49,7 +49,7 @@ abstract class DynamicPartitionPruningSuiteBase
protected def initState(): Unit = {}
protected def runAnalyzeColumnCommands: Boolean = true
- override def beforeAll(): Unit = {
+ override protected def beforeAll(): Unit = {
super.beforeAll()
initState()
@@ -107,6 +107,10 @@ abstract class DynamicPartitionPruningSuiteBase
(6, 60)
)
+ if (tableFormat == "hive") {
+ spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
+ }
+
spark.range(1000)
.select($"id" as "product_id", ($"id" % 10) as "store_id", ($"id" + 1) as "code")
.write
@@ -150,11 +154,12 @@ abstract class DynamicPartitionPruningSuiteBase
if (runAnalyzeColumnCommands) {
sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
+ sql("ANALYZE TABLE dim_store COMPUTE STATISTICS FOR COLUMNS store_id")
sql("ANALYZE TABLE code_stats COMPUTE STATISTICS FOR COLUMNS store_id")
}
}
- override def afterAll(): Unit = {
+ override protected def afterAll(): Unit = {
try {
sql("DROP TABLE IF EXISTS fact_np")
sql("DROP TABLE IF EXISTS fact_sk")
@@ -248,7 +253,7 @@ abstract class DynamicPartitionPruningSuiteBase
/**
* Collect the children of all correctly pushed down dynamic pruning expressions in a spark plan.
*/
- private def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = {
+ protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = {
flatMap(plan) {
case s: FileSourceScanExec => s.partitionFilters.collect {
case d: DynamicPruningExpression => d.child
@@ -466,7 +471,8 @@ abstract class DynamicPartitionPruningSuiteBase
Given("no stats and selective predicate with the size of dim too large")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0.02") {
withTable("fact_aux") {
sql(
"""
@@ -1020,41 +1026,6 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
- test("no partition pruning when the build side is a stream") {
- withTable("fact") {
- val input = MemoryStream[Int]
- val stream = input.toDF.select($"value" as "one", ($"value" * 3) as "code")
- spark.range(100).select(
- $"id",
- ($"id" + 1).as("one"),
- ($"id" + 2).as("two"),
- ($"id" + 3).as("three"))
- .write.partitionBy("one")
- .format(tableFormat).mode("overwrite").saveAsTable("fact")
- val table = sql("SELECT * from fact f")
-
- // join a partitioned table with a stream
- val joined = table.join(stream, Seq("one")).where("code > 40")
- val query = joined.writeStream.format("memory").queryName("test").start()
- input.addData(1, 10, 20, 40, 50)
- try {
- query.processAllAvailable()
- } finally {
- query.stop()
- }
- // search dynamic pruning predicates on the executed plan
- val plan = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.executedPlan
- val ret = plan.find {
- case s: FileSourceScanExec => s.partitionFilters.exists {
- case _: DynamicPruningExpression => true
- case _ => false
- }
- case _ => false
- }
- assert(ret.isDefined == false)
- }
- }
-
test("avoid reordering broadcast join keys to match input hash partitioning") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
@@ -1512,7 +1483,49 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
-abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningSuiteBase {
+abstract class DynamicPartitionPruningDataSourceSuiteBase
+ extends DynamicPartitionPruningSuiteBase
+ with SharedSparkSession {
+
+ import testImplicits._
+
+ test("no partition pruning when the build side is a stream") {
+ withTable("fact") {
+ val input = MemoryStream[Int]
+ val stream = input.toDF.select($"value" as "one", ($"value" * 3) as "code")
+ spark.range(100).select(
+ $"id",
+ ($"id" + 1).as("one"),
+ ($"id" + 2).as("two"),
+ ($"id" + 3).as("three"))
+ .write.partitionBy("one")
+ .format(tableFormat).mode("overwrite").saveAsTable("fact")
+ val table = sql("SELECT * from fact f")
+
+ // join a partitioned table with a stream
+ val joined = table.join(stream, Seq("one")).where("code > 40")
+ val query = joined.writeStream.format("memory").queryName("test").start()
+ input.addData(1, 10, 20, 40, 50)
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+ // search dynamic pruning predicates on the executed plan
+ val plan = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.executedPlan
+ val ret = plan.find {
+ case s: FileSourceScanExec => s.partitionFilters.exists {
+ case _: DynamicPruningExpression => true
+ case _ => false
+ }
+ case _ => false
+ }
+ assert(ret.isDefined == false)
+ }
+ }
+}
+
+abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDataSourceSuiteBase {
import testImplicits._
@@ -1606,7 +1619,7 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite
class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
with EnableAdaptiveExecutionSuite
-abstract class DynamicPartitionPruningV2Suite extends DynamicPartitionPruningSuiteBase {
+abstract class DynamicPartitionPruningV2Suite extends DynamicPartitionPruningDataSourceSuiteBase {
override protected def runAnalyzeColumnCommands: Boolean = false
override protected def initState(): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 936cca4..05dd3ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -224,12 +224,20 @@ case class HiveTableScanExec(
}
}
+ // Filters unused DynamicPruningExpression expressions - one which has been replaced
+ // with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning
+ private def filterUnusedDynamicPruningExpressions(
+ predicates: Seq[Expression]): Seq[Expression] = {
+ predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+ }
+
override def doCanonicalize(): HiveTableScanExec = {
val input: AttributeSeq = relation.output
HiveTableScanExec(
requestedAttributes.map(QueryPlan.normalizeExpressions(_, input)),
relation.canonicalized.asInstanceOf[HiveTableRelation],
- QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
+ QueryPlan.normalizePredicates(
+ filterUnusedDynamicPruningExpressions(partitionPruningPred), input))(sparkSession)
}
override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
new file mode 100644
index 0000000..bdd18a7
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+abstract class DynamicPartitionPruningHiveScanSuite
+ extends DynamicPartitionPruningSuiteBase with TestHiveSingleton with SQLTestUtils {
+
+ override val tableFormat: String = "hive"
+
+ override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = {
+ flatMap(plan) {
+ case s: FileSourceScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+ case s: BatchScanExec => s.runtimeFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+ case h: HiveTableScanExec => h.partitionPruningPred.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+ case _ => Nil
+ }
+ }
+}
+
+class DynamicPartitionPruningHiveScanSuiteAEOff extends DynamicPartitionPruningHiveScanSuite
+ with DisableAdaptiveExecutionSuite
+
+class DynamicPartitionPruningHiveScanSuiteAEOn extends DynamicPartitionPruningHiveScanSuite
+ with EnableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org