You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/09/29 00:46:33 UTC

spark git commit: [SPARK-25559][SQL] Remove the unsupported predicates in Parquet when possible

Repository: spark
Updated Branches:
  refs/heads/master 9362c5cc2 -> 5d726b865


[SPARK-25559][SQL] Remove the unsupported predicates in Parquet when possible

## What changes were proposed in this pull request?

Currently, in `ParquetFilters`, if one of the children predicates is not supported by Parquet, the entire predicates will be thrown away. In fact, if the unsupported predicate is in the top level `And` condition or in the child before hitting `Not` or `Or` condition, it can be safely removed.

## How was this patch tested?

Tests are added.

Closes #22574 from dbtsai/removeUnsupportedPredicatesInParquet.

Lead-authored-by: DB Tsai <d_...@apple.com>
Co-authored-by: Dongjoon Hyun <do...@apache.org>
Co-authored-by: DB Tsai <db...@dbtsai.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d726b86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d726b86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d726b86

Branch: refs/heads/master
Commit: 5d726b865948f993911fd5b9730b25cfa94e16c7
Parents: 9362c5c
Author: DB Tsai <d_...@apple.com>
Authored: Fri Sep 28 17:46:11 2018 -0700
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Fri Sep 28 17:46:11 2018 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFilters.scala    |  38 +++--
 .../parquet/ParquetFilterSuite.scala            | 147 ++++++++++++++++++-
 2 files changed, 172 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d726b86/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 0c286de..44a0d20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -394,7 +394,13 @@ private[parquet] class ParquetFilters(
    */
   def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
     val nameToParquetField = getFieldMap(schema)
+    createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true)
+  }
 
+  private def createFilterHelper(
+      nameToParquetField: Map[String, ParquetField],
+      predicate: sources.Filter,
+      canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = {
     // Decimal type must make sure that filter value's scale matched the file.
     // If doesn't matched, which would cause data corruption.
     def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
@@ -488,26 +494,36 @@ private[parquet] class ParquetFilters(
           .map(_(nameToParquetField(name).fieldName, value))
 
       case sources.And(lhs, rhs) =>
-        // At here, it is not safe to just convert one side if we do not understand the
-        // other side. Here is an example used to explain the reason.
+        // At here, it is not safe to just convert one side and remove the other side
+        // if we do not understand what the parent filters are.
+        //
+        // Here is an example used to explain the reason.
         // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
         // convert b in ('1'). If we only convert a = 2, we will end up with a filter
         // NOT(a = 2), which will generate wrong results.
-        // Pushing one side of AND down is only safe to do at the top level.
-        // You can see ParquetRelation's initializeLocalJobFunc method as an example.
-        for {
-          lhsFilter <- createFilter(schema, lhs)
-          rhsFilter <- createFilter(schema, rhs)
-        } yield FilterApi.and(lhsFilter, rhsFilter)
+        //
+        // Pushing one side of AND down is only safe to do at the top level or in the child
+        // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
+        // can be safely removed.
+        val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
+        val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
+
+        (lhsFilterOption, rhsFilterOption) match {
+          case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
+          case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
+          case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
+          case _ => None
+        }
 
       case sources.Or(lhs, rhs) =>
         for {
-          lhsFilter <- createFilter(schema, lhs)
-          rhsFilter <- createFilter(schema, rhs)
+          lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
+          rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
         } yield FilterApi.or(lhsFilter, rhsFilter)
 
       case sources.Not(pred) =>
-        createFilter(schema, pred).map(FilterApi.not)
+        createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+          .map(FilterApi.not)
 
       case sources.In(name, values) if canMakeFilterOn(name, values.head)
         && values.distinct.length <= pushDownInFilterThreshold =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5d726b86/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 7ebb750..01e41b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
   }
 
-  test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
+  test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") {
     val schema = StructType(Seq(
       StructField("a", IntegerType, nullable = false),
       StructField("b", StringType, nullable = true),
@@ -770,7 +770,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
           sources.GreaterThan("c", 1.5D)))
     }
 
-    assertResult(None) {
+    // Testing when `canRemoveOneSideInAnd == true`
+    // case sources.And(lhs, rhs) =>
+    //   ...
+    //     case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
+    assertResult(Some(lt(intColumn("a"), 10: Integer))) {
       parquetFilters.createFilter(
         parquetSchema,
         sources.And(
@@ -778,6 +782,83 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
           sources.StringContains("b", "prefix")))
     }
 
+    // Testing when `canRemoveOneSideInAnd == true`
+    // case sources.And(lhs, rhs) =>
+    //   ...
+    //     case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
+    assertResult(Some(lt(intColumn("a"), 10: Integer))) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.And(
+          sources.StringContains("b", "prefix"),
+          sources.LessThan("a", 10)))
+    }
+
+    // Testing complex And conditions
+    assertResult(Some(
+      FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.And(
+          sources.And(
+            sources.LessThan("a", 10),
+            sources.StringContains("b", "prefix")
+          ),
+          sources.GreaterThan("a", 5)))
+    }
+
+    // Testing complex And conditions
+    assertResult(Some(
+      FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.And(
+          sources.GreaterThan("a", 5),
+          sources.And(
+            sources.StringContains("b", "prefix"),
+            sources.LessThan("a", 10)
+          )))
+    }
+
+    // Testing
+    // case sources.Or(lhs, rhs) =>
+    //   ...
+    //     lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
+    assertResult(None) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.Or(
+          sources.And(
+            sources.GreaterThan("a", 1),
+            sources.StringContains("b", "prefix")),
+          sources.GreaterThan("a", 2)))
+    }
+
+    // Testing
+    // case sources.Or(lhs, rhs) =>
+    //   ...
+    //     rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
+    assertResult(None) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.Or(
+          sources.GreaterThan("a", 2),
+          sources.And(
+            sources.GreaterThan("a", 1),
+            sources.StringContains("b", "prefix"))))
+    }
+
+    // Testing
+    // case sources.Not(pred) =>
+    //   createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+    //     .map(FilterApi.not)
+    //
+    // and
+    //
+    // Testing when `canRemoveOneSideInAnd == false`
+    // case sources.And(lhs, rhs) =>
+    //   ...
+    //     case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
     assertResult(None) {
       parquetFilters.createFilter(
         parquetSchema,
@@ -786,6 +867,68 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
             sources.GreaterThan("a", 1),
             sources.StringContains("b", "prefix"))))
     }
+
+    // Testing
+    // case sources.Not(pred) =>
+    //   createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+    //     .map(FilterApi.not)
+    //
+    // and
+    //
+    // Testing when `canRemoveOneSideInAnd == false`
+    // case sources.And(lhs, rhs) =>
+    //   ...
+    //     case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
+    assertResult(None) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.Not(
+          sources.And(
+            sources.StringContains("b", "prefix"),
+            sources.GreaterThan("a", 1))))
+    }
+
+    // Testing
+    // case sources.Not(pred) =>
+    //   createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+    //     .map(FilterApi.not)
+    //
+    // and
+    //
+    // Testing passing `canRemoveOneSideInAnd = false` into
+    // case sources.And(lhs, rhs) =>
+    //   val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
+    assertResult(None) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.Not(
+          sources.And(
+            sources.And(
+              sources.GreaterThan("a", 1),
+              sources.StringContains("b", "prefix")),
+            sources.GreaterThan("a", 2))))
+    }
+
+    // Testing
+    // case sources.Not(pred) =>
+    //   createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+    //     .map(FilterApi.not)
+    //
+    // and
+    //
+    // Testing passing `canRemoveOneSideInAnd = false` into
+    // case sources.And(lhs, rhs) =>
+    //   val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
+    assertResult(None) {
+      parquetFilters.createFilter(
+        parquetSchema,
+        sources.Not(
+          sources.And(
+            sources.GreaterThan("a", 2),
+            sources.And(
+              sources.GreaterThan("a", 1),
+              sources.StringContains("b", "prefix")))))
+    }
   }
 
   test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org