You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/12/23 07:08:42 UTC

spark git commit: [SPARK-11164][SQL] Add InSet pushdown filter back for Parquet

Repository: spark
Updated Branches:
  refs/heads/master 86761e10e -> 50301c0a2


[SPARK-11164][SQL] Add InSet pushdown filter back for Parquet

When the filter is ```"b in ('1', '2')"```, the filter is not pushed down to Parquet. Thanks!

Author: gatorsmile <ga...@gmail.com>
Author: xiaoli <li...@gmail.com>
Author: Xiao Li <xi...@Xiaos-MacBook-Pro.local>

Closes #10278 from gatorsmile/parquetFilterNot.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50301c0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50301c0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50301c0a

Branch: refs/heads/master
Commit: 50301c0a28b64c5348b0f2c2d828589c0833c70c
Parents: 86761e1
Author: Liang-Chi Hsieh <vi...@viirya.org>
Authored: Wed Dec 23 14:08:29 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Wed Dec 23 14:08:29 2015 +0800

----------------------------------------------------------------------
 .../optimizer/BooleanSimplificationSuite.scala  | 20 +++++++------
 .../datasources/parquet/ParquetFilters.scala    |  3 ++
 .../parquet/ParquetFilterSuite.scala            | 30 ++++++++++++++++++++
 3 files changed, 45 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50301c0a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index cde346e..a0c71d8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -86,23 +86,27 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
 
     checkCondition(
       ('a === 'b || 'b > 3) && ('a === 'b || 'a > 3) && ('a === 'b || 'a < 5),
-      ('a === 'b || 'b > 3 && 'a > 3 && 'a < 5))
+      'a === 'b || 'b > 3 && 'a > 3 && 'a < 5)
   }
 
   test("a && (!a || b)") {
-    checkCondition(('a && (!('a) || 'b )), ('a && 'b))
+    checkCondition('a && (!'a || 'b ), 'a && 'b)
 
-    checkCondition(('a && ('b || !('a) )), ('a && 'b))
+    checkCondition('a && ('b || !'a ), 'a && 'b)
 
-    checkCondition(((!('a) || 'b ) && 'a), ('b && 'a))
+    checkCondition((!'a || 'b ) && 'a, 'b && 'a)
 
-    checkCondition((('b || !('a) ) && 'a), ('b && 'a))
+    checkCondition(('b || !'a ) && 'a, 'b && 'a)
   }
 
-  test("!(a && b) , !(a || b)") {
-    checkCondition((!('a && 'b)), (!('a) || !('b)))
+  test("DeMorgan's law") {
+    checkCondition(!('a && 'b), !'a || !'b)
 
-    checkCondition(!('a || 'b), (!('a) && !('b)))
+    checkCondition(!('a || 'b), !'a && !'b)
+
+    checkCondition(!(('a && 'b) || ('c && 'd)), (!'a || !'b) && (!'c || !'d))
+
+    checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d))
   }
 
   private val caseInsensitiveAnalyzer =

http://git-wip-us.apache.org/repos/asf/spark/blob/50301c0a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 883013b..ac9b65b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -256,6 +256,9 @@ private[sql] object ParquetFilters {
       case sources.GreaterThanOrEqual(name, value) =>
         makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
 
+      case sources.In(name, valueSet) =>
+        makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet))
+
       case sources.And(lhs, rhs) =>
         // At here, it is not safe to just convert one side if we do not understand the
         // other side. Here is an example used to explain the reason.

http://git-wip-us.apache.org/repos/asf/spark/blob/50301c0a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 045425f..9197b8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -381,4 +381,34 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-11164: test the parquet filter in") {
+    import testImplicits._
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+        withTempPath { dir =>
+          val path = s"${dir.getCanonicalPath}/table1"
+          (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
+
+          // When a filter is pushed to Parquet, Parquet can apply it to every row.
+          // So, we can check the number of rows returned from the Parquet
+          // to make sure our filter pushdown work.
+          val df = sqlContext.read.parquet(path).where("b in (0,2)")
+          assert(stripSparkFilter(df).count == 3)
+
+          val df1 = sqlContext.read.parquet(path).where("not (b in (1))")
+          assert(stripSparkFilter(df1).count == 3)
+
+          val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)")
+          assert(stripSparkFilter(df2).count == 2)
+
+          val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)")
+          assert(stripSparkFilter(df3).count == 4)
+
+          val df4 = sqlContext.read.parquet(path).where("not (a <= 2)")
+          assert(stripSparkFilter(df4).count == 3)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org