You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/08 11:18:54 UTC

[spark] branch master updated: [SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2036074  [SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down
2036074 is described below

commit 2036074b996d3cdd6aaace31a79c825797a6997c
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Mar 8 19:18:32 2019 +0800

    [SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down
    
    ## What changes were proposed in this pull request?
    
    [SPARK-24638](https://issues.apache.org/jira/browse/SPARK-24638) adds support for Parquet file `StartsWith` predicate push down.
    `InMemoryTable` can also support this feature.
    
    This is an example to explain how it works, Imagine that the `id` column stored as below:
    
    Partition ID | lowerBound | upperBound
    -- | -- | --
    p1 | '1' | '9'
    p2 | '10' | '19'
    p3 | '20' | '29'
    p4 | '30' | '39'
    p5 | '40' | '49'
    
    A filter ```df.filter($"id".startsWith("2"))``` or ```id like '2%'```
    then we substr lowerBound and upperBound:
    
    Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2"))
    -- | -- | --
    p1 | '1' | '9'
    p2 | '1' | '1'
    p3 | '2' | '2'
    p4 | '3' | '3'
    p5 | '4' | '4'
    
    We can see that we only need to read `p1` and `p3`.
    
    ## How was this patch tested?
    
     unit tests and benchmark tests
    
    benchmark test result:
    ```
    ================================================================================================
    Pushdown benchmark for StringStartsWith
    ================================================================================================
    
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
    Intel(R) Core(TM) i7-7820HQ CPU  2.90GHz
    StringStartsWith filter: (value like '10%'): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    InMemoryTable Vectorized                    12068 / 14198          1.3         767.3       1.0X
    InMemoryTable Vectorized (Pushdown)           5457 / 8662          2.9         347.0       2.2X
    
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
    Intel(R) Core(TM) i7-7820HQ CPU  2.90GHz
    StringStartsWith filter: (value like '1000%'): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    InMemoryTable Vectorized                      5246 / 5355          3.0         333.5       1.0X
    InMemoryTable Vectorized (Pushdown)           2185 / 2346          7.2         138.9       2.4X
    
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6
    Intel(R) Core(TM) i7-7820HQ CPU  2.90GHz
    StringStartsWith filter: (value like '786432%'): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    InMemoryTable Vectorized                      5112 / 5312          3.1         325.0       1.0X
    InMemoryTable Vectorized (Pushdown)           2292 / 2522          6.9         145.7       2.2X
    ```
    
    Closes #23004 from wangyum/SPARK-26004.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/columnar/InMemoryTableScanExec.scala | 30 +++++++++++++++++++++-
 .../columnar/PartitionBatchPruningSuite.scala      |  9 +++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 8f8d801..b827878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -194,7 +194,7 @@ case class InMemoryTableScanExec(
   }
 
   // Returned filter predicate should return false iff it is impossible for the input expression
-  // to evaluate to `true' based on statistics collected about this partition batch.
+  // to evaluate to `true` based on statistics collected about this partition batch.
   @transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
     case And(lhs: Expression, rhs: Expression)
       if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
@@ -237,6 +237,34 @@ case class InMemoryTableScanExec(
       if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty =>
       list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
         l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
+
+    // This is an example to explain how it works, imagine that the id column stored as follows:
+    // __________________________________________
+    // | Partition ID | lowerBound | upperBound |
+    // |--------------|------------|------------|
+    // |      p1      |    '1'     |    '9'     |
+    // |      p2      |    '10'    |    '19'    |
+    // |      p3      |    '20'    |    '29'    |
+    // |      p4      |    '30'    |    '39'    |
+    // |      p5      |    '40'    |    '49'    |
+    // |______________|____________|____________|
+    //
+    // A filter: df.filter($"id".startsWith("2")).
+    // In this case it substr lowerBound and upperBound:
+    // ________________________________________________________________________________________
+    // | Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2")) |
+    // |--------------|-----------------------------------|-----------------------------------|
+    // |      p1      |    '1'                            |    '9'                            |
+    // |      p2      |    '1'                            |    '1'                            |
+    // |      p3      |    '2'                            |    '2'                            |
+    // |      p4      |    '3'                            |    '3'                            |
+    // |      p5      |    '4'                            |    '4'                            |
+    // |______________|___________________________________|___________________________________|
+    //
+    // We can see that we only need to read p1 and p3.
+    case StartsWith(a: AttributeReference, ExtractableLiteral(l)) =>
+      statsFor(a).lowerBound.substr(0, Length(l)) <= l &&
+        l <= statsFor(a).upperBound.substr(0, Length(l))
   }
 
   lazy val partitionFilters: Seq[Expression] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index af493e9..b3a5c68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -170,6 +170,15 @@ class PartitionBatchPruningSuite
     }
   }
 
+  // Support `StartsWith` predicate
+  checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '18%'", 1, 1)(
+    180 to 189
+  )
+  checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '%'", 5, 11)(
+    100 to 200
+  )
+  checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE '18%' like s", 5, 11)(Seq())
+
   // With disable IN_MEMORY_PARTITION_PRUNING option
   test("disable IN_MEMORY_PARTITION_PRUNING") {
     spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org