You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/03 12:23:38 UTC

[spark] branch branch-3.2 updated: [SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 45a9e032501 [SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown
45a9e032501 is described below

commit 45a9e032501c51784ed0d4aa987ccbe4719ff7f4
Author: zzzzming95 <50...@qq.com>
AuthorDate: Wed Aug 3 21:22:55 2022 +0900

    [SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown
    
    ### What changes were proposed in this pull request?
    
    fix `BinaryFileFormat` filter push down bug.
    
    Before modification, when Filter tree is:
    ````
    -Not
    - - IsNotNull
    ````
    
    Since `IsNotNull` cannot be matched, `IsNotNull` will return a result that is always true (that is, `case _ => (_ => true)`), that is, no filter pushdown is performed. But because there is still a `Not`, after negation, it will return a result that is always False, that is, no result can be returned.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    test suit in `BinaryFileFormatSuite`
    
    ```
        testCreateFilterFunction(
          Seq(Not(IsNull(LENGTH))),
          Seq((t1, true), (t2, true), (t3, true)))
    ```
    
    Closes #37350 from zzzzming95/SPARK-39900.
    
    Lead-authored-by: zzzzming95 <50...@qq.com>
    Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit a0dc7d9117b66426aaa2257c8d448a2f96882ecd)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../datasources/binaryfile/BinaryFileFormat.scala  | 54 +++++++++++-----------
 .../binaryfile/BinaryFileFormatSuite.scala         |  5 +-
 2 files changed, 31 insertions(+), 28 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index 4b500aa9637..3874d70981b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -97,7 +97,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-    val filterFuncs = filters.map(filter => createFilterFunction(filter))
+    val filterFuncs = filters.flatMap(filter => createFilterFunction(filter))
     val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
 
     file: PartitionedFile => {
@@ -158,38 +158,38 @@ object BinaryFileFormat {
     StructField(LENGTH, LongType, false) ::
     StructField(CONTENT, BinaryType, true) :: Nil)
 
-  private[binaryfile] def createFilterFunction(filter: Filter): FileStatus => Boolean = {
+  private[binaryfile] def createFilterFunction(filter: Filter): Option[FileStatus => Boolean] = {
     filter match {
-      case And(left, right) =>
-        s => createFilterFunction(left)(s) && createFilterFunction(right)(s)
-      case Or(left, right) =>
-        s => createFilterFunction(left)(s) || createFilterFunction(right)(s)
-      case Not(child) =>
-        s => !createFilterFunction(child)(s)
-
-      case LessThan(LENGTH, value: Long) =>
-        _.getLen < value
-      case LessThanOrEqual(LENGTH, value: Long) =>
-        _.getLen <= value
-      case GreaterThan(LENGTH, value: Long) =>
-        _.getLen > value
-      case GreaterThanOrEqual(LENGTH, value: Long) =>
-        _.getLen >= value
-      case EqualTo(LENGTH, value: Long) =>
-        _.getLen == value
-
+      case And(left, right) => (createFilterFunction(left), createFilterFunction(right)) match {
+        case (Some(leftPred), Some(rightPred)) => Some(s => leftPred(s) && rightPred(s))
+        case (Some(leftPred), None) => Some(leftPred)
+        case (None, Some(rightPred)) => Some(rightPred)
+        case (None, None) => Some(_ => true)
+      }
+      case Or(left, right) => (createFilterFunction(left), createFilterFunction(right)) match {
+        case (Some(leftPred), Some(rightPred)) => Some(s => leftPred(s) || rightPred(s))
+        case _ => Some(_ => true)
+      }
+      case Not(child) => createFilterFunction(child) match {
+        case Some(pred) => Some(s => !pred(s))
+        case _ => Some(_ => true)
+      }
+      case LessThan(LENGTH, value: Long) => Some(_.getLen < value)
+      case LessThanOrEqual(LENGTH, value: Long) => Some(_.getLen <= value)
+      case GreaterThan(LENGTH, value: Long) => Some(_.getLen > value)
+      case GreaterThanOrEqual(LENGTH, value: Long) => Some(_.getLen >= value)
+      case EqualTo(LENGTH, value: Long) => Some(_.getLen == value)
       case LessThan(MODIFICATION_TIME, value: Timestamp) =>
-        _.getModificationTime < value.getTime
+        Some(_.getModificationTime < value.getTime)
       case LessThanOrEqual(MODIFICATION_TIME, value: Timestamp) =>
-        _.getModificationTime <= value.getTime
+        Some(_.getModificationTime <= value.getTime)
       case GreaterThan(MODIFICATION_TIME, value: Timestamp) =>
-        _.getModificationTime > value.getTime
+        Some(_.getModificationTime > value.getTime)
       case GreaterThanOrEqual(MODIFICATION_TIME, value: Timestamp) =>
-        _.getModificationTime >= value.getTime
+        Some(_.getModificationTime >= value.getTime)
       case EqualTo(MODIFICATION_TIME, value: Timestamp) =>
-        _.getModificationTime == value.getTime
-
-      case _ => (_ => true)
+        Some(_.getModificationTime == value.getTime)
+      case _ => None
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 86ff026d7b1..9a374d5c302 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -183,7 +183,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
   def testCreateFilterFunction(
       filters: Seq[Filter],
       testCases: Seq[(FileStatus, Boolean)]): Unit = {
-    val funcs = filters.map(BinaryFileFormat.createFilterFunction)
+    val funcs = filters.flatMap(BinaryFileFormat.createFilterFunction)
     testCases.foreach { case (status, expected) =>
       assert(funcs.forall(f => f(status)) === expected,
         s"$filters applied to $status should be $expected.")
@@ -250,6 +250,9 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
       Seq(Or(LessThanOrEqual(MODIFICATION_TIME, new Timestamp(1L)),
         GreaterThanOrEqual(MODIFICATION_TIME, new Timestamp(3L)))),
       Seq((t1, true), (t2, false), (t3, true)))
+    testCreateFilterFunction(
+      Seq(Not(IsNull(LENGTH))),
+      Seq((t1, true), (t2, true), (t3, true)))
 
     // test filters applied on both columns
     testCreateFilterFunction(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org