You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/04 05:38:43 UTC

[spark] branch master updated: [SPARK-26893][SQL] Allow partition pruning with subquery filters on file source

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 82820f8  [SPARK-26893][SQL] Allow partition pruning with subquery filters on file source
82820f8 is described below

commit 82820f8e2574ca18faa00b54dff1a3a44d105359
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Mon Mar 4 13:38:22 2019 +0800

    [SPARK-26893][SQL] Allow partition pruning with subquery filters on file source
    
    ## What changes were proposed in this pull request?
    
    This PR introduces leveraging of subquery filters for partition pruning in file source.
    
    Subquery expressions are not allowed to be used for partition pruning in `FileSourceStrategy` now, instead a `FilterExec` is added around the `FileSourceScanExec` to do the job.
    This PR optimizes the process by allowing partition pruning subquery expressions as partition filters.
    
    ## How was this patch tested?
    
    Added new UT and run existing UTs especially SPARK-25482 and SPARK-24085 related ones.
    
    Closes #23802 from peter-toth/SPARK-26893.
    
    Authored-by: Peter Toth <pe...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 22 +++++++++++++++-------
 .../execution/datasources/DataSourceStrategy.scala |  2 +-
 .../execution/datasources/FileSourceStrategy.scala | 10 ++++++++--
 .../datasources/PruneFileSourcePartitions.scala    | 11 ++---------
 .../datasources/v2/DataSourceV2Strategy.scala      |  5 +++--
 .../org/apache/spark/sql/execution/subquery.scala  | 12 ++++++++++++
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 21 +++++++++++++++++++++
 7 files changed, 62 insertions(+), 21 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 3aed2ce..92f7d66 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -187,6 +187,14 @@ case class FileSourceScanExec(
     ret
   }
 
+  /**
+   * [[partitionFilters]] can contain subqueries whose results are available only at runtime so
+   * accessing [[selectedPartitions]] should be guarded by this method during planning
+   */
+  private def hasPartitionsAvailableAtRunTime: Boolean = {
+    partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
+  }
+
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
     val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
       relation.bucketSpec
@@ -223,7 +231,7 @@ case class FileSourceScanExec(
           val sortColumns =
             spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
 
-          val sortOrder = if (sortColumns.nonEmpty) {
+          val sortOrder = if (sortColumns.nonEmpty && !hasPartitionsAvailableAtRunTime) {
             // In case of bucketing, its possible to have multiple files belonging to the
             // same bucket in a given relation. Each of these files are locally sorted
             // but those files combined together are not globally sorted. Given that,
@@ -272,12 +280,12 @@ case class FileSourceScanExec(
         "PushedFilters" -> seqToString(pushedDownFilters),
         "DataFilters" -> seqToString(dataFilters),
         "Location" -> locationDesc)
-    val withOptPartitionCount =
-      relation.partitionSchemaOption.map { _ =>
-        metadata + ("PartitionCount" -> selectedPartitions.size.toString)
-      } getOrElse {
-        metadata
-      }
+    val withOptPartitionCount = if (relation.partitionSchemaOption.isDefined &&
+      !hasPartitionsAvailableAtRunTime) {
+      metadata + ("PartitionCount" -> selectedPartitions.size.toString)
+    } else {
+      metadata
+    }
 
     val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
       val numSelectedBuckets = optionalBucketSet.map { b =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index b73dc30..a1252ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -434,7 +434,7 @@ object DataSourceStrategy {
   protected[sql] def normalizeFilters(
       filters: Seq[Expression],
       attributes: Seq[AttributeReference]): Seq[Expression] = {
-    filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
+    filters.map { e =>
       e transform {
         case a: AttributeReference =>
           a.withName(attributes.find(_.semanticEquals(a)).get.name)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 970cbda..c8a42f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -159,9 +159,14 @@ object FileSourceStrategy extends Strategy with Logging {
 
       logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
 
+      // subquery expressions are filtered out because they can't be used to prune buckets or pushed
+      // down as data filters, yet they would be executed
+      val normalizedFiltersWithoutSubqueries =
+        normalizedFilters.filterNot(SubqueryExpression.hasSubquery)
+
       val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
       val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
-        genBucketSet(normalizedFilters, bucketSpec.get)
+        genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec.get)
       } else {
         None
       }
@@ -170,7 +175,8 @@ object FileSourceStrategy extends Strategy with Logging {
         l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
 
       // Partition keys are not available in the statistics of the files.
-      val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
+      val dataFilters =
+        normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty)
 
       // Predicates with both partition keys and attributes need to be evaluated after the scan.
       val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 329b953..9db7c30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -39,15 +39,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
             _,
             _))
         if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
-      // The attribute name of predicate could be different than the one in schema in case of
-      // case insensitive, we should change them to match the one in schema, so we donot need to
-      // worry about case sensitivity anymore.
-      val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
-        e transform {
-          case a: AttributeReference =>
-            a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
-        }
-      }
+      val normalizedFilters = DataSourceStrategy.normalizeFilters(
+        filters.filterNot(SubqueryExpression.hasSubquery), logicalRelation.output)
 
       val sparkSession = fsRelation.sparkSession
       val partitionColumns =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 55d7b0a..bf60626 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import scala.collection.mutable
 
 import org.apache.spark.sql.{AnalysisException, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
@@ -106,7 +106,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
     case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
       val scanBuilder = relation.newScanBuilder()
 
-      val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output)
+      val normalizedFilters = DataSourceStrategy.normalizeFilters(
+        filters.filterNot(SubqueryExpression.hasSubquery), relation.output)
 
       // `pushedFilters` will be pushed down and evaluated in the underlying data sources.
       // `postScanFilters` need to be evaluated after the scan.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index e180d22..e084c79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -38,6 +38,18 @@ abstract class ExecSubqueryExpression extends PlanExpression[SubqueryExec] {
   def updateResult(): Unit
 }
 
+object ExecSubqueryExpression {
+  /**
+   * Returns true when an expression contains a subquery
+   */
+  def hasSubquery(e: Expression): Boolean = {
+    e.find {
+      case _: ExecSubqueryExpression => true
+      case _ => false
+    }.isDefined
+  }
+}
+
 /**
  * A subquery that will return only one row and one column.
  *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 8ade433..53eb7ea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -21,6 +21,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
+import org.apache.spark.sql.execution.{ExecSubqueryExpression, FileSourceScanExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.datasources.FileScanRDD
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SubquerySuite extends QueryTest with SharedSQLContext {
@@ -1281,6 +1283,25 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-26893: Allow pushdown of partition pruning subquery filters to file source") {
+    withTable("a", "b") {
+      spark.range(4).selectExpr("id", "id % 2 AS p").write.partitionBy("p").saveAsTable("a")
+      spark.range(2).write.saveAsTable("b")
+
+      val df = sql("SELECT * FROM a WHERE p <= (SELECT MIN(id) FROM b)")
+      checkAnswer(df, Seq(Row(0, 0), Row(2, 0)))
+      // need to execute the query before we can examine fs.inputRDDs()
+      assert(df.queryExecution.executedPlan match {
+        case WholeStageCodegenExec(fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)) =>
+          partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+            fs.inputRDDs().forall(
+              _.asInstanceOf[FileScanRDD].filePartitions.forall(
+                _.files.forall(_.filePath.contains("p=0"))))
+        case _ => false
+      })
+    }
+  }
+
   test("SPARK-26078: deduplicate fake self joins for IN subqueries") {
     withTempView("a", "b") {
       Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org