You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/04/30 16:13:36 UTC

spark git commit: [SPARK-24072][SQL] clearly define pushed filters

Repository: spark
Updated Branches:
  refs/heads/master 3121b411f -> b42ad165b


[SPARK-24072][SQL] clearly define pushed filters

## What changes were proposed in this pull request?

filters like parquet row group filter, which is actually pushed to the data source but still to be evaluated by Spark, should also count as `pushedFilters`.

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #21143 from cloud-fan/step1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b42ad165
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b42ad165
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b42ad165

Branch: refs/heads/master
Commit: b42ad165bb93c96cc5be9ed05b5026f9baafdfa2
Parents: 3121b41
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Apr 30 09:13:32 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Apr 30 09:13:32 2018 -0700

----------------------------------------------------------------------
 .../reader/SupportsPushDownCatalystFilters.java | 11 +++-
 .../v2/reader/SupportsPushDownFilters.java      | 10 +++-
 .../datasources/v2/DataSourceV2Relation.scala   | 63 ++++++++++++--------
 .../datasources/v2/DataSourceV2ScanExec.scala   |  1 +
 .../datasources/v2/DataSourceV2Strategy.scala   |  4 +-
 .../v2/DataSourceV2StringFormat.scala           | 19 ++----
 .../v2/PushDownOperatorsToDataSource.scala      |  6 +-
 .../streaming/continuous/ContinuousSuite.scala  |  2 +-
 8 files changed, 68 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
index 290d614..4543c14 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
@@ -39,7 +39,16 @@ public interface SupportsPushDownCatalystFilters extends DataSourceReader {
   Expression[] pushCatalystFilters(Expression[] filters);
 
   /**
-   * Returns the catalyst filters that are pushed in {@link #pushCatalystFilters(Expression[])}.
+   * Returns the catalyst filters that are pushed to the data source via
+   * {@link #pushCatalystFilters(Expression[])}.
+   *
+   * There are 3 kinds of filters:
+   *  1. pushable filters which don't need to be evaluated again after scanning.
+   *  2. pushable filters which still need to be evaluated after scanning, e.g. parquet
+   *     row group filter.
+   *  3. non-pushable filters.
+   * Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
+   *
    * It's possible that there is no filters in the query and
    * {@link #pushCatalystFilters(Expression[])} is never called, empty array should be returned for
    * this case.

http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
index 1cff024..b6a90a3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
@@ -37,7 +37,15 @@ public interface SupportsPushDownFilters extends DataSourceReader {
   Filter[] pushFilters(Filter[] filters);
 
   /**
-   * Returns the filters that are pushed in {@link #pushFilters(Filter[])}.
+   * Returns the filters that are pushed to the data source via {@link #pushFilters(Filter[])}.
+   *
+   * There are 3 kinds of filters:
+   *  1. pushable filters which don't need to be evaluated again after scanning.
+   *  2. pushable filters which still need to be evaluated after scanning, e.g. parquet
+   *     row group filter.
+   *  3. non-pushable filters.
+   * Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
+   *
    * It's possible that there is no filters in the query and {@link #pushFilters(Filter[])}
    * is never called, empty array should be returned for this case.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 2b282ff..90fb5a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -54,9 +54,12 @@ case class DataSourceV2Relation(
 
   private lazy val v2Options: DataSourceOptions = makeV2Options(options)
 
+  // postScanFilters: filters that need to be evaluated after the scan.
+  // pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
+  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
   lazy val (
       reader: DataSourceReader,
-      unsupportedFilters: Seq[Expression],
+      postScanFilters: Seq[Expression],
       pushedFilters: Seq[Expression]) = {
     val newReader = userSpecifiedSchema match {
       case Some(s) =>
@@ -67,14 +70,16 @@ case class DataSourceV2Relation(
 
     DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)
 
-    val (remainingFilters, pushedFilters) = filters match {
+    val (postScanFilters, pushedFilters) = filters match {
       case Some(filterSeq) =>
         DataSourceV2Relation.pushFilters(newReader, filterSeq)
       case _ =>
         (Nil, Nil)
     }
+    logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
+    logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
 
-    (newReader, remainingFilters, pushedFilters)
+    (newReader, postScanFilters, pushedFilters)
   }
 
   override def doCanonicalize(): LogicalPlan = {
@@ -121,6 +126,8 @@ case class StreamingDataSourceV2Relation(
 
   override def simpleString: String = "Streaming RelationV2 " + metadataString
 
+  override def pushedFilters: Seq[Expression] = Nil
+
   override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
 
   // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
@@ -217,31 +224,35 @@ object DataSourceV2Relation {
       reader: DataSourceReader,
       filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
     reader match {
-      case catalystFilterSupport: SupportsPushDownCatalystFilters =>
-        (
-            catalystFilterSupport.pushCatalystFilters(filters.toArray),
-            catalystFilterSupport.pushedCatalystFilters()
-        )
-
-      case filterSupport: SupportsPushDownFilters =>
-        // A map from original Catalyst expressions to corresponding translated data source
-        // filters. If a predicate is not in this map, it means it cannot be pushed down.
-        val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
-          DataSourceStrategy.translateFilter(p).map(f => p -> f)
-        }.toMap
-
-        // Catalyst predicate expressions that cannot be converted to data source filters.
-        val nonConvertiblePredicates = filters.filterNot(translatedMap.contains)
-
-        // Data source filters that cannot be pushed down. An unhandled filter means
-        // the data source cannot guarantee the rows returned can pass the filter.
-        // As a result we must return it so Spark can plan an extra filter operator.
-        val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet
-        val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) =>
-          unhandledFilters.contains(f)
+      case r: SupportsPushDownCatalystFilters =>
+        val postScanFilters = r.pushCatalystFilters(filters.toArray)
+        val pushedFilters = r.pushedCatalystFilters()
+        (postScanFilters, pushedFilters)
+
+      case r: SupportsPushDownFilters =>
+        // A map from translated data source filters to original catalyst filter expressions.
+        val translatedFilterToExpr = scala.collection.mutable.HashMap.empty[Filter, Expression]
+        // Catalyst filter expression that can't be translated to data source filters.
+        val untranslatableExprs = scala.collection.mutable.ArrayBuffer.empty[Expression]
+
+        for (filterExpr <- filters) {
+          val translated = DataSourceStrategy.translateFilter(filterExpr)
+          if (translated.isDefined) {
+            translatedFilterToExpr(translated.get) = filterExpr
+          } else {
+            untranslatableExprs += filterExpr
+          }
         }
 
-        (nonConvertiblePredicates ++ unhandledPredicates.keys, pushedPredicates.keys.toSeq)
+        // Data source filters that need to be evaluated again after scanning. which means
+        // the data source cannot guarantee the rows returned can pass these filters.
+        // As a result we must return it so Spark can plan an extra filter operator.
+        val postScanFilters =
+          r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
+        // The filters which are marked as pushed to this data source
+        val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
+
+        (untranslatableExprs ++ postScanFilters, pushedFilters)
 
       case _ => (filters, Nil)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 3a5e7bf..41bdda4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -41,6 +41,7 @@ case class DataSourceV2ScanExec(
     output: Seq[AttributeReference],
     @transient source: DataSourceV2,
     @transient options: Map[String, String],
+    @transient pushedFilters: Seq[Expression],
     @transient reader: DataSourceReader)
   extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index c2a3144..1b7c639 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDat
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case r: DataSourceV2Relation =>
-      DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil
+      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
 
     case r: StreamingDataSourceV2Relation =>
-      DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil
+      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
index aed55a4..693e67d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
@@ -19,11 +19,9 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.DataSourceV2
-import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.util.Utils
 
 /**
@@ -49,16 +47,9 @@ trait DataSourceV2StringFormat {
   def options: Map[String, String]
 
   /**
-   * The created data source reader. Here we use it to get the filters that has been pushed down
-   * so far, itself doesn't take part in the equals/hashCode.
+   * The filters which have been pushed to the data source.
    */
-  def reader: DataSourceReader
-
-  private lazy val filters = reader match {
-    case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
-    case s: SupportsPushDownFilters => s.pushedFilters().toSet
-    case _ => Set.empty
-  }
+  def pushedFilters: Seq[Expression]
 
   private def sourceName: String = source match {
     case registered: DataSourceRegister => registered.shortName()
@@ -68,8 +59,8 @@ trait DataSourceV2StringFormat {
   def metadataString: String = {
     val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
 
-    if (filters.nonEmpty) {
-      entries += "Filters" -> filters.mkString("[", ", ", "]")
+    if (pushedFilters.nonEmpty) {
+      entries += "Filters" -> pushedFilters.mkString("[", ", ", "]")
     }
 
     // TODO: we should only display some standard options like path, table, etc.

http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index f23d228..9293d4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -57,9 +57,9 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
         projection = projection.asInstanceOf[Seq[AttributeReference]],
         filters = Some(filters))
 
-      // Add a Filter for any filters that could not be pushed
-      val unpushedFilter = newRelation.unsupportedFilters.reduceLeftOption(And)
-      val filtered = unpushedFilter.map(Filter(_, newRelation)).getOrElse(newRelation)
+      // Add a Filter for any filters that need to be evaluated after scan.
+      val postScanFilterCond = newRelation.postScanFilters.reduceLeftOption(And)
+      val filtered = postScanFilterCond.map(Filter(_, newRelation)).getOrElse(newRelation)
 
       // Add a Project to ensure the output matches the required projection
       if (newRelation.output != projectAttrs) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b42ad165/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 5f222e7..cd1704a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest {
       case s: ContinuousExecution =>
         assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
         val reader = s.lastExecution.executedPlan.collectFirst {
-          case DataSourceV2ScanExec(_, _, _, r: RateStreamContinuousReader) => r
+          case DataSourceV2ScanExec(_, _, _, _, r: RateStreamContinuousReader) => r
         }.get
 
         val deltaMs = numTriggers * 1000 + 300


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org