You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/08 11:18:54 UTC
[spark] branch master updated: [SPARK-26004][SQL] InMemoryTable
support StartsWith predicate push down
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2036074 [SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down
2036074 is described below
commit 2036074b996d3cdd6aaace31a79c825797a6997c
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Mar 8 19:18:32 2019 +0800
[SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down
## What changes were proposed in this pull request?
[SPARK-24638](https://issues.apache.org/jira/browse/SPARK-24638) adds support for Parquet file `StartsWith` predicate push down.
`InMemoryTable` can also support this feature.
This is an example to explain how it works, Imagine that the `id` column stored as below:
Partition ID | lowerBound | upperBound
-- | -- | --
p1 | '1' | '9'
p2 | '10' | '19'
p3 | '20' | '29'
p4 | '30' | '39'
p5 | '40' | '49'
A filter ```df.filter($"id".startsWith("2"))``` or ```id like '2%'```
then we substr lowerBound and upperBound:
Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2"))
-- | -- | --
p1 | '1' | '9'
p2 | '1' | '1'
p3 | '2' | '2'
p4 | '3' | '3'
p5 | '4' | '4'
We can see that we only need to read `p1` and `p3`.
## How was this patch tested?
unit tests and benchmark tests
benchmark test result:
```
================================================================================================
Pushdown benchmark for StringStartsWith
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz
StringStartsWith filter: (value like '10%'): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized 12068 / 14198 1.3 767.3 1.0X
InMemoryTable Vectorized (Pushdown) 5457 / 8662 2.9 347.0 2.2X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz
StringStartsWith filter: (value like '1000%'): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized 5246 / 5355 3.0 333.5 1.0X
InMemoryTable Vectorized (Pushdown) 2185 / 2346 7.2 138.9 2.4X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz
StringStartsWith filter: (value like '786432%'): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
InMemoryTable Vectorized 5112 / 5312 3.1 325.0 1.0X
InMemoryTable Vectorized (Pushdown) 2292 / 2522 6.9 145.7 2.2X
```
Closes #23004 from wangyum/SPARK-26004.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/columnar/InMemoryTableScanExec.scala | 30 +++++++++++++++++++++-
.../columnar/PartitionBatchPruningSuite.scala | 9 +++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 8f8d801..b827878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -194,7 +194,7 @@ case class InMemoryTableScanExec(
}
// Returned filter predicate should return false iff it is impossible for the input expression
- // to evaluate to `true' based on statistics collected about this partition batch.
+ // to evaluate to `true` based on statistics collected about this partition batch.
@transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
@@ -237,6 +237,34 @@ case class InMemoryTableScanExec(
if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty =>
list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
+
+ // This is an example to explain how it works, imagine that the id column stored as follows:
+ // __________________________________________
+ // | Partition ID | lowerBound | upperBound |
+ // |--------------|------------|------------|
+ // | p1 | '1' | '9' |
+ // | p2 | '10' | '19' |
+ // | p3 | '20' | '29' |
+ // | p4 | '30' | '39' |
+ // | p5 | '40' | '49' |
+ // |______________|____________|____________|
+ //
+ // A filter: df.filter($"id".startsWith("2")).
+ // In this case it substr lowerBound and upperBound:
+ // ________________________________________________________________________________________
+ // | Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2")) |
+ // |--------------|-----------------------------------|-----------------------------------|
+ // | p1 | '1' | '9' |
+ // | p2 | '1' | '1' |
+ // | p3 | '2' | '2' |
+ // | p4 | '3' | '3' |
+ // | p5 | '4' | '4' |
+ // |______________|___________________________________|___________________________________|
+ //
+ // We can see that we only need to read p1 and p3.
+ case StartsWith(a: AttributeReference, ExtractableLiteral(l)) =>
+ statsFor(a).lowerBound.substr(0, Length(l)) <= l &&
+ l <= statsFor(a).upperBound.substr(0, Length(l))
}
lazy val partitionFilters: Seq[Expression] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index af493e9..b3a5c68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -170,6 +170,15 @@ class PartitionBatchPruningSuite
}
}
+ // Support `StartsWith` predicate
+ checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '18%'", 1, 1)(
+ 180 to 189
+ )
+ checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '%'", 5, 11)(
+ 100 to 200
+ )
+ checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE '18%' like s", 5, 11)(Seq())
+
// With disable IN_MEMORY_PARTITION_PRUNING option
test("disable IN_MEMORY_PARTITION_PRUNING") {
spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org