You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/09/29 00:46:33 UTC
spark git commit: [SPARK-25559][SQL] Remove the unsupported
predicates in Parquet when possible
Repository: spark
Updated Branches:
refs/heads/master 9362c5cc2 -> 5d726b865
[SPARK-25559][SQL] Remove the unsupported predicates in Parquet when possible
## What changes were proposed in this pull request?
Currently, in `ParquetFilters`, if one of the children predicates is not supported by Parquet, the entire predicates will be thrown away. In fact, if the unsupported predicate is in the top level `And` condition or in the child before hitting `Not` or `Or` condition, it can be safely removed.
## How was this patch tested?
Tests are added.
Closes #22574 from dbtsai/removeUnsupportedPredicatesInParquet.
Lead-authored-by: DB Tsai <d_...@apple.com>
Co-authored-by: Dongjoon Hyun <do...@apache.org>
Co-authored-by: DB Tsai <db...@dbtsai.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d726b86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d726b86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d726b86
Branch: refs/heads/master
Commit: 5d726b865948f993911fd5b9730b25cfa94e16c7
Parents: 9362c5c
Author: DB Tsai <d_...@apple.com>
Authored: Fri Sep 28 17:46:11 2018 -0700
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Fri Sep 28 17:46:11 2018 -0700
----------------------------------------------------------------------
.../datasources/parquet/ParquetFilters.scala | 38 +++--
.../parquet/ParquetFilterSuite.scala | 147 ++++++++++++++++++-
2 files changed, 172 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5d726b86/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 0c286de..44a0d20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -394,7 +394,13 @@ private[parquet] class ParquetFilters(
*/
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToParquetField = getFieldMap(schema)
+ createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true)
+ }
+ private def createFilterHelper(
+ nameToParquetField: Map[String, ParquetField],
+ predicate: sources.Filter,
+ canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = {
// Decimal type must make sure that filter value's scale matched the file.
// If doesn't matched, which would cause data corruption.
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
@@ -488,26 +494,36 @@ private[parquet] class ParquetFilters(
.map(_(nameToParquetField(name).fieldName, value))
case sources.And(lhs, rhs) =>
- // At here, it is not safe to just convert one side if we do not understand the
- // other side. Here is an example used to explain the reason.
+ // At here, it is not safe to just convert one side and remove the other side
+ // if we do not understand what the parent filters are.
+ //
+ // Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
- // Pushing one side of AND down is only safe to do at the top level.
- // You can see ParquetRelation's initializeLocalJobFunc method as an example.
- for {
- lhsFilter <- createFilter(schema, lhs)
- rhsFilter <- createFilter(schema, rhs)
- } yield FilterApi.and(lhsFilter, rhsFilter)
+ //
+ // Pushing one side of AND down is only safe to do at the top level or in the child
+ // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
+ // can be safely removed.
+ val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
+ val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
+
+ (lhsFilterOption, rhsFilterOption) match {
+ case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
+ case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
+ case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
+ case _ => None
+ }
case sources.Or(lhs, rhs) =>
for {
- lhsFilter <- createFilter(schema, lhs)
- rhsFilter <- createFilter(schema, rhs)
+ lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
+ rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
} yield FilterApi.or(lhsFilter, rhsFilter)
case sources.Not(pred) =>
- createFilter(schema, pred).map(FilterApi.not)
+ createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+ .map(FilterApi.not)
case sources.In(name, values) if canMakeFilterOn(name, values.head)
&& values.distinct.length <= pushDownInFilterThreshold =>
http://git-wip-us.apache.org/repos/asf/spark/blob/5d726b86/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 7ebb750..01e41b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
- test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
+ test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false),
StructField("b", StringType, nullable = true),
@@ -770,7 +770,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("c", 1.5D)))
}
- assertResult(None) {
+ // Testing when `canRemoveOneSideInAnd == true`
+ // case sources.And(lhs, rhs) =>
+ // ...
+ // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
+ assertResult(Some(lt(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(
parquetSchema,
sources.And(
@@ -778,6 +782,83 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.StringContains("b", "prefix")))
}
+ // Testing when `canRemoveOneSideInAnd == true`
+ // case sources.And(lhs, rhs) =>
+ // ...
+ // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
+ assertResult(Some(lt(intColumn("a"), 10: Integer))) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.And(
+ sources.StringContains("b", "prefix"),
+ sources.LessThan("a", 10)))
+ }
+
+ // Testing complex And conditions
+ assertResult(Some(
+ FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.And(
+ sources.And(
+ sources.LessThan("a", 10),
+ sources.StringContains("b", "prefix")
+ ),
+ sources.GreaterThan("a", 5)))
+ }
+
+ // Testing complex And conditions
+ assertResult(Some(
+ FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.And(
+ sources.GreaterThan("a", 5),
+ sources.And(
+ sources.StringContains("b", "prefix"),
+ sources.LessThan("a", 10)
+ )))
+ }
+
+ // Testing
+ // case sources.Or(lhs, rhs) =>
+ // ...
+ // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
+ assertResult(None) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.Or(
+ sources.And(
+ sources.GreaterThan("a", 1),
+ sources.StringContains("b", "prefix")),
+ sources.GreaterThan("a", 2)))
+ }
+
+ // Testing
+ // case sources.Or(lhs, rhs) =>
+ // ...
+ // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
+ assertResult(None) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.Or(
+ sources.GreaterThan("a", 2),
+ sources.And(
+ sources.GreaterThan("a", 1),
+ sources.StringContains("b", "prefix"))))
+ }
+
+ // Testing
+ // case sources.Not(pred) =>
+ // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+ // .map(FilterApi.not)
+ //
+ // and
+ //
+ // Testing when `canRemoveOneSideInAnd == false`
+ // case sources.And(lhs, rhs) =>
+ // ...
+ // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
@@ -786,6 +867,68 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix"))))
}
+
+ // Testing
+ // case sources.Not(pred) =>
+ // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+ // .map(FilterApi.not)
+ //
+ // and
+ //
+ // Testing when `canRemoveOneSideInAnd == false`
+ // case sources.And(lhs, rhs) =>
+ // ...
+ // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
+ assertResult(None) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.Not(
+ sources.And(
+ sources.StringContains("b", "prefix"),
+ sources.GreaterThan("a", 1))))
+ }
+
+ // Testing
+ // case sources.Not(pred) =>
+ // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+ // .map(FilterApi.not)
+ //
+ // and
+ //
+ // Testing passing `canRemoveOneSideInAnd = false` into
+ // case sources.And(lhs, rhs) =>
+ // val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
+ assertResult(None) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.Not(
+ sources.And(
+ sources.And(
+ sources.GreaterThan("a", 1),
+ sources.StringContains("b", "prefix")),
+ sources.GreaterThan("a", 2))))
+ }
+
+ // Testing
+ // case sources.Not(pred) =>
+ // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
+ // .map(FilterApi.not)
+ //
+ // and
+ //
+ // Testing passing `canRemoveOneSideInAnd = false` into
+ // case sources.And(lhs, rhs) =>
+ // val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
+ assertResult(None) {
+ parquetFilters.createFilter(
+ parquetSchema,
+ sources.Not(
+ sources.And(
+ sources.GreaterThan("a", 2),
+ sources.And(
+ sources.GreaterThan("a", 1),
+ sources.StringContains("b", "prefix")))))
+ }
}
test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org