You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/12/23 07:08:42 UTC
spark git commit: [SPARK-11164][SQL] Add InSet pushdown filter back
for Parquet
Repository: spark
Updated Branches:
refs/heads/master 86761e10e -> 50301c0a2
[SPARK-11164][SQL] Add InSet pushdown filter back for Parquet
When the filter is ```"b in ('1', '2')"```, the filter is not pushed down to Parquet. Thanks!
Author: gatorsmile <ga...@gmail.com>
Author: xiaoli <li...@gmail.com>
Author: Xiao Li <xi...@Xiaos-MacBook-Pro.local>
Closes #10278 from gatorsmile/parquetFilterNot.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50301c0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50301c0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50301c0a
Branch: refs/heads/master
Commit: 50301c0a28b64c5348b0f2c2d828589c0833c70c
Parents: 86761e1
Author: Liang-Chi Hsieh <vi...@viirya.org>
Authored: Wed Dec 23 14:08:29 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Wed Dec 23 14:08:29 2015 +0800
----------------------------------------------------------------------
.../optimizer/BooleanSimplificationSuite.scala | 20 +++++++------
.../datasources/parquet/ParquetFilters.scala | 3 ++
.../parquet/ParquetFilterSuite.scala | 30 ++++++++++++++++++++
3 files changed, 45 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/50301c0a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index cde346e..a0c71d8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -86,23 +86,27 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
checkCondition(
('a === 'b || 'b > 3) && ('a === 'b || 'a > 3) && ('a === 'b || 'a < 5),
- ('a === 'b || 'b > 3 && 'a > 3 && 'a < 5))
+ 'a === 'b || 'b > 3 && 'a > 3 && 'a < 5)
}
test("a && (!a || b)") {
- checkCondition(('a && (!('a) || 'b )), ('a && 'b))
+ checkCondition('a && (!'a || 'b ), 'a && 'b)
- checkCondition(('a && ('b || !('a) )), ('a && 'b))
+ checkCondition('a && ('b || !'a ), 'a && 'b)
- checkCondition(((!('a) || 'b ) && 'a), ('b && 'a))
+ checkCondition((!'a || 'b ) && 'a, 'b && 'a)
- checkCondition((('b || !('a) ) && 'a), ('b && 'a))
+ checkCondition(('b || !'a ) && 'a, 'b && 'a)
}
- test("!(a && b) , !(a || b)") {
- checkCondition((!('a && 'b)), (!('a) || !('b)))
+ test("DeMorgan's law") {
+ checkCondition(!('a && 'b), !'a || !'b)
- checkCondition(!('a || 'b), (!('a) && !('b)))
+ checkCondition(!('a || 'b), !'a && !'b)
+
+ checkCondition(!(('a && 'b) || ('c && 'd)), (!'a || !'b) && (!'c || !'d))
+
+ checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d))
}
private val caseInsensitiveAnalyzer =
http://git-wip-us.apache.org/repos/asf/spark/blob/50301c0a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 883013b..ac9b65b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -256,6 +256,9 @@ private[sql] object ParquetFilters {
case sources.GreaterThanOrEqual(name, value) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.In(name, valueSet) =>
+ makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet))
+
case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
http://git-wip-us.apache.org/repos/asf/spark/blob/50301c0a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 045425f..9197b8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -381,4 +381,34 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-11164: test the parquet filter in") {
+ import testImplicits._
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table1"
+ (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
+
+ // When a filter is pushed to Parquet, Parquet can apply it to every row.
+ // So, we can check the number of rows returned from the Parquet
+ // to make sure our filter pushdown work.
+ val df = sqlContext.read.parquet(path).where("b in (0,2)")
+ assert(stripSparkFilter(df).count == 3)
+
+ val df1 = sqlContext.read.parquet(path).where("not (b in (1))")
+ assert(stripSparkFilter(df1).count == 3)
+
+ val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)")
+ assert(stripSparkFilter(df2).count == 2)
+
+ val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)")
+ assert(stripSparkFilter(df3).count == 4)
+
+ val df4 = sqlContext.read.parquet(path).where("not (a <= 2)")
+ assert(stripSparkFilter(df4).count == 3)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org