You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/07/10 07:58:20 UTC

spark git commit: [SPARK-24706][SQL] ByteType and ShortType support pushdown to parquet

Repository: spark
Updated Branches:
  refs/heads/master 4984f1af7 -> a28900956


[SPARK-24706][SQL] ByteType and ShortType support pushdown to parquet

## What changes were proposed in this pull request?

`ByteType` and `ShortType` support pushdown to parquet data source.
[Benchmark result](https://issues.apache.org/jira/browse/SPARK-24706?focusedCommentId=16528878&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528878).

## How was this patch tested?

unit tests

Author: Yuming Wang <yu...@ebay.com>

Closes #21682 from wangyum/SPARK-24706.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2890095
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2890095
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2890095

Branch: refs/heads/master
Commit: a289009567c1566a1df4bcdfdf0111e82ae3d81d
Parents: 4984f1a
Author: Yuming Wang <yu...@ebay.com>
Authored: Tue Jul 10 15:58:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 10 15:58:14 2018 +0800

----------------------------------------------------------------------
 .../FilterPushdownBenchmark-results.txt         | 32 +++++------
 .../datasources/parquet/ParquetFilters.scala    | 34 +++++++-----
 .../parquet/ParquetFilterSuite.scala            | 56 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a2890095/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
index 29fe434..110669b 100644
--- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
+++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
@@ -542,39 +542,39 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 tinyint row (value = CAST(63 AS tinyint)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            3726 / 3775          4.2         236.9       1.0X
-Parquet Vectorized (Pushdown)                 3741 / 3789          4.2         237.9       1.0X
-Native ORC Vectorized                         2793 / 2909          5.6         177.6       1.3X
-Native ORC Vectorized (Pushdown)               530 /  561         29.7          33.7       7.0X
+Parquet Vectorized                            3461 / 3997          4.5         220.1       1.0X
+Parquet Vectorized (Pushdown)                  270 /  315         58.4          17.1      12.8X
+Native ORC Vectorized                         4107 / 5372          3.8         261.1       0.8X
+Native ORC Vectorized (Pushdown)               778 / 1553         20.2          49.5       4.4X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 10% tinyint rows (value < CAST(12 AS tinyint)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            4385 / 4406          3.6         278.8       1.0X
-Parquet Vectorized (Pushdown)                 4398 / 4454          3.6         279.6       1.0X
-Native ORC Vectorized                         3420 / 3501          4.6         217.4       1.3X
-Native ORC Vectorized (Pushdown)              1395 / 1432         11.3          88.7       3.1X
+Parquet Vectorized                            4771 / 6655          3.3         303.3       1.0X
+Parquet Vectorized (Pushdown)                 1322 / 1606         11.9          84.0       3.6X
+Native ORC Vectorized                         4437 / 4572          3.5         282.1       1.1X
+Native ORC Vectorized (Pushdown)              1781 / 1976          8.8         113.2       2.7X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 50% tinyint rows (value < CAST(63 AS tinyint)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7307 / 7394          2.2         464.6       1.0X
-Parquet Vectorized (Pushdown)                 7411 / 7461          2.1         471.2       1.0X
-Native ORC Vectorized                         6501 / 7814          2.4         413.4       1.1X
-Native ORC Vectorized (Pushdown)              7341 / 8637          2.1         466.7       1.0X
+Parquet Vectorized                            7433 / 7752          2.1         472.6       1.0X
+Parquet Vectorized (Pushdown)                 5863 / 5913          2.7         372.8       1.3X
+Native ORC Vectorized                         7986 / 8084          2.0         507.7       0.9X
+Native ORC Vectorized (Pushdown)              6522 / 6608          2.4         414.6       1.1X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 90% tinyint rows (value < CAST(114 AS tinyint)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Parquet Vectorized                          11886 / 13122          1.3         755.7       1.0X
-Parquet Vectorized (Pushdown)               12557 / 14173          1.3         798.4       0.9X
-Native ORC Vectorized                       10758 / 11971          1.5         684.0       1.1X
-Native ORC Vectorized (Pushdown)            10564 / 10713          1.5         671.6       1.1X
+Parquet Vectorized                          11190 / 11519          1.4         711.4       1.0X
+Parquet Vectorized (Pushdown)               10861 / 11206          1.4         690.5       1.0X
+Native ORC Vectorized                       11622 / 12196          1.4         738.9       1.0X
+Native ORC Vectorized (Pushdown)            11377 / 11654          1.4         723.3       1.0X
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a2890095/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 4827f70..4c9b940 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -45,6 +45,8 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
       decimalMetadata: DecimalMetadata)
 
   private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
   private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
   private val ParquetLongType = ParquetSchemaType(null, INT64, null)
   private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
@@ -60,8 +62,10 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
   private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
     case ParquetBooleanType =>
       (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
-    case ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+      (n: String, v: Any) => FilterApi.eq(
+        intColumn(n),
+        Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
     case ParquetLongType =>
       (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
     case ParquetFloatType =>
@@ -87,8 +91,10 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
   private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
     case ParquetBooleanType =>
       (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
-    case ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+      (n: String, v: Any) => FilterApi.notEq(
+        intColumn(n),
+        Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
     case ParquetLongType =>
       (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
     case ParquetFloatType =>
@@ -111,8 +117,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
   }
 
   private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
-    case ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+      (n: String, v: Any) =>
+        FilterApi.lt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
       (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
     case ParquetFloatType =>
@@ -132,8 +139,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
   }
 
   private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
-    case ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+      (n: String, v: Any) =>
+        FilterApi.ltEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
       (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
     case ParquetFloatType =>
@@ -153,8 +161,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
   }
 
   private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
-    case ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+      (n: String, v: Any) =>
+        FilterApi.gt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
       (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
     case ParquetFloatType =>
@@ -174,8 +183,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
   }
 
   private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
-    case ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+      (n: String, v: Any) =>
+        FilterApi.gtEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
       (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
     case ParquetFloatType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a2890095/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index f2c0bda..067d2fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -179,6 +179,62 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
   }
 
+  test("filter pushdown - tinyint") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toByte)))) { implicit df =>
+      assert(df.schema.head.dataType === ByteType)
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1.toByte, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 <=> 1.toByte, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 =!= 1.toByte, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 < 2.toByte, classOf[Lt[_]], 1)
+      checkFilterPredicate('_1 > 3.toByte, classOf[Gt[_]], 4)
+      checkFilterPredicate('_1 <= 1.toByte, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4.toByte, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1.toByte) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(1.toByte) <=> '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(2.toByte) > '_1, classOf[Lt[_]], 1)
+      checkFilterPredicate(Literal(3.toByte) < '_1, classOf[Gt[_]], 4)
+      checkFilterPredicate(Literal(1.toByte) >= '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4.toByte) <= '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4.toByte), classOf[GtEq[_]], 4)
+      checkFilterPredicate('_1 < 2.toByte || '_1 > 3.toByte,
+        classOf[Operators.Or], Seq(Row(1), Row(4)))
+    }
+  }
+
+  test("filter pushdown - smallint") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df =>
+      assert(df.schema.head.dataType === ShortType)
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1.toShort, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 <=> 1.toShort, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 =!= 1.toShort, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 < 2.toShort, classOf[Lt[_]], 1)
+      checkFilterPredicate('_1 > 3.toShort, classOf[Gt[_]], 4)
+      checkFilterPredicate('_1 <= 1.toShort, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4.toShort, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1.toShort) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(1.toShort) <=> '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(2.toShort) > '_1, classOf[Lt[_]], 1)
+      checkFilterPredicate(Literal(3.toShort) < '_1, classOf[Gt[_]], 4)
+      checkFilterPredicate(Literal(1.toShort) >= '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4.toShort) <= '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4.toShort), classOf[GtEq[_]], 4)
+      checkFilterPredicate('_1 < 2.toShort || '_1 > 3.toShort,
+        classOf[Operators.Or], Seq(Row(1), Row(4)))
+    }
+  }
+
   test("filter pushdown - integer") {
     withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
       checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org