You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/11 07:01:51 UTC

[spark] branch master updated: [SPARK-36876][SQL] Support Dynamic Partition pruning for HiveTableScanExec

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e11ea  [SPARK-36876][SQL] Support Dynamic Partition pruning for HiveTableScanExec
32e11ea is described below

commit 32e11ea1423ec25ed6a6b9aedaa7a9d26220244a
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Oct 11 15:00:51 2021 +0800

    [SPARK-36876][SQL] Support Dynamic Partition pruning for HiveTableScanExec
    
    ### What changes were proposed in this pull request?
    Current code just support dynamic partition pruning for DSV1 and DSV2, here we support HiveTableScan
    
    ### Why are the changes needed?
    Optimize Hive Table Scan dynamic partition pruning
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Added UT
    
    Closes #34139 from AngersZhuuuu/SPARK-36876.
    
    Lead-authored-by: Angerszhuuuu <an...@gmail.com>
    Co-authored-by: AngersZhuuuu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../CleanupDynamicPruningFilters.scala             |  3 +
 .../dynamicpruning/PartitionPruning.scala          | 11 ++-
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 99 ++++++++++++----------
 .../sql/hive/execution/HiveTableScanExec.scala     | 10 ++-
 .../DynamicPartitionPruningHiveScanSuite.scala     | 54 ++++++++++++
 5 files changed, 131 insertions(+), 46 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala
index bcaed52..abf0cf6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.dynamicpruning
 
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, PredicateHelper}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -43,6 +44,8 @@ object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelp
       _.containsAnyPattern(DYNAMIC_PRUNING_EXPRESSION, DYNAMIC_PRUNING_SUBQUERY)) {
       // pass through anything that is pushed down into PhysicalOperation
       case p @ PhysicalOperation(_, _, LogicalRelation(_: HadoopFsRelation, _, _, _)) => p
+      // pass through anything that is pushed down into PhysicalOperation
+      case p @ PhysicalOperation(_, _, HiveTableRelation(_, _, _, _, _)) => p
       case p @ PhysicalOperation(_, _, _: DataSourceV2ScanRelation) => p
       // remove any Filters with DynamicPruning that didn't get pushed down to PhysicalOperation.
       case f @ Filter(condition, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
index 1ca1978..4b5f724 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.dynamicpruning
 
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
 import org.apache.spark.sql.catalyst.plans._
@@ -53,8 +54,8 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
   /**
    * Searches for a table scan that can be filtered for a given column in a logical plan.
    *
-   * This methods tries to find either a v1 partitioned scan for a given partition column or
-   * a v2 scan that support runtime filtering on a given attribute.
+   * This methods tries to find either a v1 or Hive serde partitioned scan for a given
+   * partition column or a v2 scan that support runtime filtering on a given attribute.
    */
   def getFilterableTableScan(a: Expression, plan: LogicalPlan): Option[LogicalPlan] = {
     val srcInfo: Option[(Expression, LogicalPlan)] = findExpressionAndTrackLineageDown(a, plan)
@@ -71,6 +72,12 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
             }
           case _ => None
         }
+      case (resExp, l: HiveTableRelation) =>
+        if (resExp.references.subsetOf(AttributeSet(l.partitionCols))) {
+          return Some(l)
+        } else {
+          None
+        }
       case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _)) =>
         val filterAttrs = V2ExpressionUtils.resolveRefs[Attribute](scan.filterAttributes, r)
         if (resExp.references.subsetOf(AttributeSet(filterAttrs))) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index dc84d6e..d7245c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -31,14 +31,14 @@ import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 
 /**
  * Test suite for the filtering ratio policy used to trigger dynamic partition pruning (DPP).
  */
 abstract class DynamicPartitionPruningSuiteBase
     extends QueryTest
-    with SharedSparkSession
+    with SQLTestUtils
     with GivenWhenThen
     with AdaptiveSparkPlanHelper {
 
@@ -49,7 +49,7 @@ abstract class DynamicPartitionPruningSuiteBase
   protected def initState(): Unit = {}
   protected def runAnalyzeColumnCommands: Boolean = true
 
-  override def beforeAll(): Unit = {
+  override protected def beforeAll(): Unit = {
     super.beforeAll()
 
     initState()
@@ -107,6 +107,10 @@ abstract class DynamicPartitionPruningSuiteBase
       (6, 60)
     )
 
+    if (tableFormat == "hive") {
+      spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
+    }
+
     spark.range(1000)
       .select($"id" as "product_id", ($"id" % 10) as "store_id", ($"id" + 1) as "code")
       .write
@@ -150,11 +154,12 @@ abstract class DynamicPartitionPruningSuiteBase
     if (runAnalyzeColumnCommands) {
       sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
       sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
+      sql("ANALYZE TABLE dim_store COMPUTE STATISTICS FOR COLUMNS store_id")
       sql("ANALYZE TABLE code_stats COMPUTE STATISTICS FOR COLUMNS store_id")
     }
   }
 
-  override def afterAll(): Unit = {
+  override protected def afterAll(): Unit = {
     try {
       sql("DROP TABLE IF EXISTS fact_np")
       sql("DROP TABLE IF EXISTS fact_sk")
@@ -248,7 +253,7 @@ abstract class DynamicPartitionPruningSuiteBase
   /**
    * Collect the children of all correctly pushed down dynamic pruning expressions in a spark plan.
    */
-  private def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = {
+  protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = {
     flatMap(plan) {
       case s: FileSourceScanExec => s.partitionFilters.collect {
         case d: DynamicPruningExpression => d.child
@@ -466,7 +471,8 @@ abstract class DynamicPartitionPruningSuiteBase
 
       Given("no stats and selective predicate with the size of dim too large")
       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-          SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
+          SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true",
+          SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0.02") {
         withTable("fact_aux") {
           sql(
             """
@@ -1020,41 +1026,6 @@ abstract class DynamicPartitionPruningSuiteBase
     }
   }
 
-  test("no partition pruning when the build side is a stream") {
-    withTable("fact") {
-      val input = MemoryStream[Int]
-      val stream = input.toDF.select($"value" as "one", ($"value" * 3) as "code")
-      spark.range(100).select(
-        $"id",
-        ($"id" + 1).as("one"),
-        ($"id" + 2).as("two"),
-        ($"id" + 3).as("three"))
-        .write.partitionBy("one")
-        .format(tableFormat).mode("overwrite").saveAsTable("fact")
-      val table = sql("SELECT * from fact f")
-
-      // join a partitioned table with a stream
-      val joined = table.join(stream, Seq("one")).where("code > 40")
-      val query = joined.writeStream.format("memory").queryName("test").start()
-      input.addData(1, 10, 20, 40, 50)
-      try {
-        query.processAllAvailable()
-      } finally {
-        query.stop()
-      }
-      // search dynamic pruning predicates on the executed plan
-      val plan = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.executedPlan
-      val ret = plan.find {
-        case s: FileSourceScanExec => s.partitionFilters.exists {
-          case _: DynamicPruningExpression => true
-          case _ => false
-        }
-        case _ => false
-      }
-      assert(ret.isDefined == false)
-    }
-  }
-
   test("avoid reordering broadcast join keys to match input hash partitioning") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
@@ -1512,7 +1483,49 @@ abstract class DynamicPartitionPruningSuiteBase
   }
 }
 
-abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningSuiteBase {
+abstract class DynamicPartitionPruningDataSourceSuiteBase
+    extends DynamicPartitionPruningSuiteBase
+    with SharedSparkSession {
+
+  import testImplicits._
+
+  test("no partition pruning when the build side is a stream") {
+    withTable("fact") {
+      val input = MemoryStream[Int]
+      val stream = input.toDF.select($"value" as "one", ($"value" * 3) as "code")
+      spark.range(100).select(
+        $"id",
+        ($"id" + 1).as("one"),
+        ($"id" + 2).as("two"),
+        ($"id" + 3).as("three"))
+        .write.partitionBy("one")
+        .format(tableFormat).mode("overwrite").saveAsTable("fact")
+      val table = sql("SELECT * from fact f")
+
+      // join a partitioned table with a stream
+      val joined = table.join(stream, Seq("one")).where("code > 40")
+      val query = joined.writeStream.format("memory").queryName("test").start()
+      input.addData(1, 10, 20, 40, 50)
+      try {
+        query.processAllAvailable()
+      } finally {
+        query.stop()
+      }
+      // search dynamic pruning predicates on the executed plan
+      val plan = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.executedPlan
+      val ret = plan.find {
+        case s: FileSourceScanExec => s.partitionFilters.exists {
+          case _: DynamicPruningExpression => true
+          case _ => false
+        }
+        case _ => false
+      }
+      assert(ret.isDefined == false)
+    }
+  }
+}
+
+abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDataSourceSuiteBase {
 
   import testImplicits._
 
@@ -1606,7 +1619,7 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite
 class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
   with EnableAdaptiveExecutionSuite
 
-abstract class DynamicPartitionPruningV2Suite extends DynamicPartitionPruningSuiteBase {
+abstract class DynamicPartitionPruningV2Suite extends DynamicPartitionPruningDataSourceSuiteBase {
   override protected def runAnalyzeColumnCommands: Boolean = false
 
   override protected def initState(): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 936cca4..05dd3ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -224,12 +224,20 @@ case class HiveTableScanExec(
     }
   }
 
+  // Filters unused DynamicPruningExpression expressions - one which has been replaced
+  // with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning
+  private def filterUnusedDynamicPruningExpressions(
+      predicates: Seq[Expression]): Seq[Expression] = {
+    predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+  }
+
   override def doCanonicalize(): HiveTableScanExec = {
     val input: AttributeSeq = relation.output
     HiveTableScanExec(
       requestedAttributes.map(QueryPlan.normalizeExpressions(_, input)),
       relation.canonicalized.asInstanceOf[HiveTableRelation],
-      QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
+      QueryPlan.normalizePredicates(
+        filterUnusedDynamicPruningExpressions(partitionPruningPred), input))(sparkSession)
   }
 
   override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
new file mode 100644
index 0000000..bdd18a7
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+abstract class DynamicPartitionPruningHiveScanSuite
+    extends DynamicPartitionPruningSuiteBase with TestHiveSingleton with SQLTestUtils {
+
+  override val tableFormat: String = "hive"
+
+  override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = {
+    flatMap(plan) {
+      case s: FileSourceScanExec => s.partitionFilters.collect {
+        case d: DynamicPruningExpression => d.child
+      }
+      case s: BatchScanExec => s.runtimeFilters.collect {
+        case d: DynamicPruningExpression => d.child
+      }
+      case h: HiveTableScanExec => h.partitionPruningPred.collect {
+        case d: DynamicPruningExpression => d.child
+      }
+      case _ => Nil
+    }
+  }
+}
+
+class DynamicPartitionPruningHiveScanSuiteAEOff extends DynamicPartitionPruningHiveScanSuite
+  with DisableAdaptiveExecutionSuite
+
+class DynamicPartitionPruningHiveScanSuiteAEOn extends DynamicPartitionPruningHiveScanSuite
+  with EnableAdaptiveExecutionSuite

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org