You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/03 12:23:38 UTC
[spark] branch branch-3.2 updated: [SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 45a9e032501 [SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown
45a9e032501 is described below
commit 45a9e032501c51784ed0d4aa987ccbe4719ff7f4
Author: zzzzming95 <50...@qq.com>
AuthorDate: Wed Aug 3 21:22:55 2022 +0900
[SPARK-39900][SQL] Address partial or negated condition in binary format's predicate pushdown
### What changes were proposed in this pull request?
fix `BinaryFileFormat` filter push down bug.
Before modification, when Filter tree is:
````
-Not
- - IsNotNull
````
Since `IsNotNull` cannot be matched, `IsNotNull` will return a result that is always true (that is, `case _ => (_ => true)`), that is, no filter pushdown is performed. But because there is still a `Not`, after negation, it will return a result that is always False, that is, no result can be returned.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
test suit in `BinaryFileFormatSuite`
```
testCreateFilterFunction(
Seq(Not(IsNull(LENGTH))),
Seq((t1, true), (t2, true), (t3, true)))
```
Closes #37350 from zzzzming95/SPARK-39900.
Lead-authored-by: zzzzming95 <50...@qq.com>
Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit a0dc7d9117b66426aaa2257c8d448a2f96882ecd)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../datasources/binaryfile/BinaryFileFormat.scala | 54 +++++++++++-----------
.../binaryfile/BinaryFileFormatSuite.scala | 5 +-
2 files changed, 31 insertions(+), 28 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index 4b500aa9637..3874d70981b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -97,7 +97,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- val filterFuncs = filters.map(filter => createFilterFunction(filter))
+ val filterFuncs = filters.flatMap(filter => createFilterFunction(filter))
val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
file: PartitionedFile => {
@@ -158,38 +158,38 @@ object BinaryFileFormat {
StructField(LENGTH, LongType, false) ::
StructField(CONTENT, BinaryType, true) :: Nil)
- private[binaryfile] def createFilterFunction(filter: Filter): FileStatus => Boolean = {
+ private[binaryfile] def createFilterFunction(filter: Filter): Option[FileStatus => Boolean] = {
filter match {
- case And(left, right) =>
- s => createFilterFunction(left)(s) && createFilterFunction(right)(s)
- case Or(left, right) =>
- s => createFilterFunction(left)(s) || createFilterFunction(right)(s)
- case Not(child) =>
- s => !createFilterFunction(child)(s)
-
- case LessThan(LENGTH, value: Long) =>
- _.getLen < value
- case LessThanOrEqual(LENGTH, value: Long) =>
- _.getLen <= value
- case GreaterThan(LENGTH, value: Long) =>
- _.getLen > value
- case GreaterThanOrEqual(LENGTH, value: Long) =>
- _.getLen >= value
- case EqualTo(LENGTH, value: Long) =>
- _.getLen == value
-
+ case And(left, right) => (createFilterFunction(left), createFilterFunction(right)) match {
+ case (Some(leftPred), Some(rightPred)) => Some(s => leftPred(s) && rightPred(s))
+ case (Some(leftPred), None) => Some(leftPred)
+ case (None, Some(rightPred)) => Some(rightPred)
+ case (None, None) => Some(_ => true)
+ }
+ case Or(left, right) => (createFilterFunction(left), createFilterFunction(right)) match {
+ case (Some(leftPred), Some(rightPred)) => Some(s => leftPred(s) || rightPred(s))
+ case _ => Some(_ => true)
+ }
+ case Not(child) => createFilterFunction(child) match {
+ case Some(pred) => Some(s => !pred(s))
+ case _ => Some(_ => true)
+ }
+ case LessThan(LENGTH, value: Long) => Some(_.getLen < value)
+ case LessThanOrEqual(LENGTH, value: Long) => Some(_.getLen <= value)
+ case GreaterThan(LENGTH, value: Long) => Some(_.getLen > value)
+ case GreaterThanOrEqual(LENGTH, value: Long) => Some(_.getLen >= value)
+ case EqualTo(LENGTH, value: Long) => Some(_.getLen == value)
case LessThan(MODIFICATION_TIME, value: Timestamp) =>
- _.getModificationTime < value.getTime
+ Some(_.getModificationTime < value.getTime)
case LessThanOrEqual(MODIFICATION_TIME, value: Timestamp) =>
- _.getModificationTime <= value.getTime
+ Some(_.getModificationTime <= value.getTime)
case GreaterThan(MODIFICATION_TIME, value: Timestamp) =>
- _.getModificationTime > value.getTime
+ Some(_.getModificationTime > value.getTime)
case GreaterThanOrEqual(MODIFICATION_TIME, value: Timestamp) =>
- _.getModificationTime >= value.getTime
+ Some(_.getModificationTime >= value.getTime)
case EqualTo(MODIFICATION_TIME, value: Timestamp) =>
- _.getModificationTime == value.getTime
-
- case _ => (_ => true)
+ Some(_.getModificationTime == value.getTime)
+ case _ => None
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 86ff026d7b1..9a374d5c302 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -183,7 +183,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
def testCreateFilterFunction(
filters: Seq[Filter],
testCases: Seq[(FileStatus, Boolean)]): Unit = {
- val funcs = filters.map(BinaryFileFormat.createFilterFunction)
+ val funcs = filters.flatMap(BinaryFileFormat.createFilterFunction)
testCases.foreach { case (status, expected) =>
assert(funcs.forall(f => f(status)) === expected,
s"$filters applied to $status should be $expected.")
@@ -250,6 +250,9 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
Seq(Or(LessThanOrEqual(MODIFICATION_TIME, new Timestamp(1L)),
GreaterThanOrEqual(MODIFICATION_TIME, new Timestamp(3L)))),
Seq((t1, true), (t2, false), (t3, true)))
+ testCreateFilterFunction(
+ Seq(Not(IsNull(LENGTH))),
+ Seq((t1, true), (t2, true), (t3, true)))
// test filters applied on both columns
testCreateFilterFunction(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org