You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/07/10 07:58:20 UTC
spark git commit: [SPARK-24706][SQL] ByteType and ShortType support
pushdown to parquet
Repository: spark
Updated Branches:
refs/heads/master 4984f1af7 -> a28900956
[SPARK-24706][SQL] ByteType and ShortType support pushdown to parquet
## What changes were proposed in this pull request?
`ByteType` and `ShortType` support pushdown to parquet data source.
[Benchmark result](https://issues.apache.org/jira/browse/SPARK-24706?focusedCommentId=16528878&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528878).
## How was this patch tested?
unit tests
Author: Yuming Wang <yu...@ebay.com>
Closes #21682 from wangyum/SPARK-24706.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2890095
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2890095
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2890095
Branch: refs/heads/master
Commit: a289009567c1566a1df4bcdfdf0111e82ae3d81d
Parents: 4984f1a
Author: Yuming Wang <yu...@ebay.com>
Authored: Tue Jul 10 15:58:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 10 15:58:14 2018 +0800
----------------------------------------------------------------------
.../FilterPushdownBenchmark-results.txt | 32 +++++------
.../datasources/parquet/ParquetFilters.scala | 34 +++++++-----
.../parquet/ParquetFilterSuite.scala | 56 ++++++++++++++++++++
3 files changed, 94 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a2890095/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
index 29fe434..110669b 100644
--- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
+++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
@@ -542,39 +542,39 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 1 tinyint row (value = CAST(63 AS tinyint)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 3726 / 3775 4.2 236.9 1.0X
-Parquet Vectorized (Pushdown) 3741 / 3789 4.2 237.9 1.0X
-Native ORC Vectorized 2793 / 2909 5.6 177.6 1.3X
-Native ORC Vectorized (Pushdown) 530 / 561 29.7 33.7 7.0X
+Parquet Vectorized 3461 / 3997 4.5 220.1 1.0X
+Parquet Vectorized (Pushdown) 270 / 315 58.4 17.1 12.8X
+Native ORC Vectorized 4107 / 5372 3.8 261.1 0.8X
+Native ORC Vectorized (Pushdown) 778 / 1553 20.2 49.5 4.4X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 10% tinyint rows (value < CAST(12 AS tinyint)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 4385 / 4406 3.6 278.8 1.0X
-Parquet Vectorized (Pushdown) 4398 / 4454 3.6 279.6 1.0X
-Native ORC Vectorized 3420 / 3501 4.6 217.4 1.3X
-Native ORC Vectorized (Pushdown) 1395 / 1432 11.3 88.7 3.1X
+Parquet Vectorized 4771 / 6655 3.3 303.3 1.0X
+Parquet Vectorized (Pushdown) 1322 / 1606 11.9 84.0 3.6X
+Native ORC Vectorized 4437 / 4572 3.5 282.1 1.1X
+Native ORC Vectorized (Pushdown) 1781 / 1976 8.8 113.2 2.7X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 50% tinyint rows (value < CAST(63 AS tinyint)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 7307 / 7394 2.2 464.6 1.0X
-Parquet Vectorized (Pushdown) 7411 / 7461 2.1 471.2 1.0X
-Native ORC Vectorized 6501 / 7814 2.4 413.4 1.1X
-Native ORC Vectorized (Pushdown) 7341 / 8637 2.1 466.7 1.0X
+Parquet Vectorized 7433 / 7752 2.1 472.6 1.0X
+Parquet Vectorized (Pushdown) 5863 / 5913 2.7 372.8 1.3X
+Native ORC Vectorized 7986 / 8084 2.0 507.7 0.9X
+Native ORC Vectorized (Pushdown) 6522 / 6608 2.4 414.6 1.1X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 90% tinyint rows (value < CAST(114 AS tinyint)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 11886 / 13122 1.3 755.7 1.0X
-Parquet Vectorized (Pushdown) 12557 / 14173 1.3 798.4 0.9X
-Native ORC Vectorized 10758 / 11971 1.5 684.0 1.1X
-Native ORC Vectorized (Pushdown) 10564 / 10713 1.5 671.6 1.1X
+Parquet Vectorized 11190 / 11519 1.4 711.4 1.0X
+Parquet Vectorized (Pushdown) 10861 / 11206 1.4 690.5 1.0X
+Native ORC Vectorized 11622 / 12196 1.4 738.9 1.0X
+Native ORC Vectorized (Pushdown) 11377 / 11654 1.4 723.3 1.0X
http://git-wip-us.apache.org/repos/asf/spark/blob/a2890095/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 4827f70..4c9b940 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -45,6 +45,8 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
decimalMetadata: DecimalMetadata)
private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+ private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
+ private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
private val ParquetLongType = ParquetSchemaType(null, INT64, null)
private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
@@ -60,8 +62,10 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
- case ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.eq(
+ intColumn(n),
+ Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
case ParquetLongType =>
(n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
case ParquetFloatType =>
@@ -87,8 +91,10 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
- case ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.notEq(
+ intColumn(n),
+ Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
case ParquetLongType =>
(n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case ParquetFloatType =>
@@ -111,8 +117,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
}
private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
- case ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+ (n: String, v: Any) =>
+ FilterApi.lt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
(n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
case ParquetFloatType =>
@@ -132,8 +139,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
}
private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
- case ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
(n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case ParquetFloatType =>
@@ -153,8 +161,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
}
private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
- case ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+ (n: String, v: Any) =>
+ FilterApi.gt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
(n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
case ParquetFloatType =>
@@ -174,8 +183,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
}
private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
- case ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetByteType | ParquetShortType | ParquetIntegerType =>
+ (n: String, v: Any) =>
+ FilterApi.gtEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
(n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
case ParquetFloatType =>
http://git-wip-us.apache.org/repos/asf/spark/blob/a2890095/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index f2c0bda..067d2fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -179,6 +179,62 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
+ test("filter pushdown - tinyint") {
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toByte)))) { implicit df =>
+ assert(df.schema.head.dataType === ByteType)
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 === 1.toByte, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 <=> 1.toByte, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 =!= 1.toByte, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 < 2.toByte, classOf[Lt[_]], 1)
+ checkFilterPredicate('_1 > 3.toByte, classOf[Gt[_]], 4)
+ checkFilterPredicate('_1 <= 1.toByte, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4.toByte, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1.toByte) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(1.toByte) <=> '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2.toByte) > '_1, classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3.toByte) < '_1, classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1.toByte) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4.toByte) <= '_1, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!('_1 < 4.toByte), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 < 2.toByte || '_1 > 3.toByte,
+ classOf[Operators.Or], Seq(Row(1), Row(4)))
+ }
+ }
+
+ test("filter pushdown - smallint") {
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df =>
+ assert(df.schema.head.dataType === ShortType)
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 === 1.toShort, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 <=> 1.toShort, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 =!= 1.toShort, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('_1 < 2.toShort, classOf[Lt[_]], 1)
+ checkFilterPredicate('_1 > 3.toShort, classOf[Gt[_]], 4)
+ checkFilterPredicate('_1 <= 1.toShort, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4.toShort, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1.toShort) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(1.toShort) <=> '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2.toShort) > '_1, classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3.toShort) < '_1, classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1.toShort) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4.toShort) <= '_1, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!('_1 < 4.toShort), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 < 2.toShort || '_1 > 3.toShort,
+ classOf[Operators.Or], Seq(Row(1), Row(4)))
+ }
+ }
+
test("filter pushdown - integer") {
withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org