You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/08 05:04:50 UTC

[spark] branch branch-3.4 updated: [SPARK-40045][SQL] Optimize the order of filtering predicates

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new e46e98d9f35 [SPARK-40045][SQL] Optimize the order of filtering predicates
e46e98d9f35 is described below

commit e46e98d9f357a715b95b75a1b44fc04f9d0ffa17
Author: huaxingao <hu...@apple.com>
AuthorDate: Tue Feb 7 21:04:30 2023 -0800

    [SPARK-40045][SQL] Optimize the order of filtering predicates
    
    All the credit of this PR goes to caican00. Here is the original [PR](https://github.com/apache/spark/pull/37479)
    
    ### What changes were proposed in this pull request?
    put untranslated filters to the right side of the translated filters.
    
    ### Why are the changes needed?
    Normally the translated filters (postScanFilters) are simple filters that can be evaluated faster, while the untranslated filters are complicated filters that take more time to evaluate, so we want to evaluate the postScanFilters filters first.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    new UT
    
    Closes #39892 from huaxingao/filter_order.
    
    Authored-by: huaxingao <hu...@apple.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit fe67269394993d819f447364de04dfc95cd21775)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../sql/connector/catalog/InMemoryBaseTable.scala  | 11 ++++-
 .../execution/datasources/v2/PushDownUtils.scala   | 10 ++++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 50 +++++++++++++++++++++-
 3 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index e7c4c784b98..f169db11085 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -289,7 +289,7 @@ abstract class InMemoryBaseTable(
   }
 
   class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
-      with SupportsPushDownRequiredColumns {
+      with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
     private var schema: StructType = tableSchema
 
     override def build: Scan =
@@ -299,6 +299,15 @@ abstract class InMemoryBaseTable(
       val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
       schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
     }
+
+    private var _pushedFilters: Array[Filter] = Array.empty
+
+    override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+      this._pushedFilters = filters
+      this._pushedFilters
+    }
+
+    override def pushedFilters(): Array[Filter] = this._pushedFilters
   }
 
   case class InMemoryStats(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index eb37a27fd7c..fe19ac552f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -66,7 +66,10 @@ object PushDownUtils {
         val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
           DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
         }
-        (Left(r.pushedFilters()), (untranslatableExprs ++ postScanFilters).toSeq)
+        // Normally translated filters (postScanFilters) are simple filters that can be evaluated
+        // faster, while the untranslated filters are complicated filters that take more time to
+        // evaluate, so we want to evaluate the postScanFilters filters first.
+        (Left(r.pushedFilters()), (postScanFilters ++ untranslatableExprs).toSeq)
 
       case r: SupportsPushDownV2Filters =>
         // A map from translated data source leaf node filters to original catalyst filter
@@ -95,7 +98,10 @@ object PushDownUtils {
         val postScanFilters = r.pushPredicates(translatedFilters.toArray).map { predicate =>
           DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, translatedFilterToExpr)
         }
-        (Right(r.pushedPredicates), (untranslatableExprs ++ postScanFilters).toSeq)
+        // Normally translated filters (postScanFilters) are simple filters that can be evaluated
+        // faster, while the untranslated filters are complicated filters that take more time to
+        // evaluate, so we want to evaluate the postScanFilters filters first.
+        (Right(r.pushedPredicates), (postScanFilters ++ untranslatableExprs).toSeq)
 
       case f: FileScanBuilder =>
         val postScanFilters = f.pushFilters(filters)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 673d8029c24..38bd24356f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -36,6 +36,8 @@ import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
 import org.apache.spark.sql.errors.QueryErrorsBase
+import org.apache.spark.sql.execution.FilterExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -49,7 +51,8 @@ import org.apache.spark.unsafe.types.UTF8String
 
 abstract class DataSourceV2SQLSuite
   extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = true)
-  with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase {
+  with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase
+  with AdaptiveSparkPlanHelper {
 
   protected val v2Source = classOf[FakeV2Provider].getName
   override protected val v2Format = v2Source
@@ -2919,6 +2922,51 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-40045: Move the post-Scan Filters to the far right") {
+    val t1 = s"${catalogAndNamespace}table"
+    withUserDefinedFunction("udfStrLen" -> true) {
+      withTable(t1) {
+        spark.udf.register("udfStrLen", (str: String) => str.length)
+        sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+        sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+        val filterBefore = spark.sql(
+          s"""
+             |SELECT id, data FROM $t1
+             |WHERE udfStrLen(data) = 1
+             |and id = 2
+             |""".stripMargin
+        )
+        val conditionBefore =
+          find(filterBefore.queryExecution.executedPlan)(_.isInstanceOf[FilterExec])
+            .head.asInstanceOf[FilterExec]
+            .condition
+        val expressionsBefore = splitConjunctivePredicates(conditionBefore)
+        assert(expressionsBefore.length == 3
+          && expressionsBefore(0).toString.trim.startsWith("isnotnull(id")
+          && expressionsBefore(1).toString.trim.startsWith("(id")
+          && expressionsBefore(2).toString.trim.startsWith("(udfStrLen(data"))
+
+        val filterAfter = spark.sql(
+          s"""
+             |SELECT id, data FROM $t1
+             |WHERE id = 2
+             |and udfStrLen(data) = 1
+             |""".stripMargin
+        )
+        val conditionAfter =
+          find(filterAfter.queryExecution.executedPlan)(_.isInstanceOf[FilterExec])
+            .head.asInstanceOf[FilterExec]
+            .condition
+        val expressionsAfter = splitConjunctivePredicates(conditionAfter)
+        assert(expressionsAfter.length == 3
+          && expressionsAfter(0).toString.trim.startsWith("isnotnull(id")
+          && expressionsAfter(1).toString.trim.startsWith("(id")
+          && expressionsAfter(2).toString.trim.startsWith("(udfStrLen(data"))
+      }
+    }
+  }
+
   private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
     checkError(
       exception = intercept[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org