You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/07/16 07:45:32 UTC
spark git commit: [SPARK-24549][SQL] Support Decimal type push down
to the parquet data sources
Repository: spark
Updated Branches:
refs/heads/master 2603ae30b -> 9549a2814
[SPARK-24549][SQL] Support Decimal type push down to the parquet data sources
## What changes were proposed in this pull request?
Support Decimal type push down to the parquet data sources.
The Decimal comparator used is: [`BINARY_AS_SIGNED_INTEGER_COMPARATOR`](https://github.com/apache/parquet-mr/blob/c6764c4a0848abf1d581e22df8b33e28ee9f2ced/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java#L224-L292).
## How was this patch tested?
unit tests and manual tests.
**manual tests**:
```scala
spark.range(10000000).selectExpr("id", "cast(id as decimal(9)) as d1", "cast(id as decimal(9, 2)) as d2", "cast(id as decimal(18)) as d3", "cast(id as decimal(18, 4)) as d4", "cast(id as decimal(38)) as d5", "cast(id as decimal(38, 18)) as d6").coalesce(1).write.option("parquet.block.size", 1048576).parquet("/tmp/spark/parquet/decimal")
val df = spark.read.parquet("/tmp/spark/parquet/decimal/")
spark.sql("set spark.sql.parquet.filterPushdown.decimal=true")
// Only read about 1 MB data
df.filter("d2 = 10000").show
// Only read about 1 MB data
df.filter("d4 = 10000").show
spark.sql("set spark.sql.parquet.filterPushdown.decimal=false")
// Read 174.3 MB data
df.filter("d2 = 10000").show
// Read 174.3 MB data
df.filter("d4 = 10000").show
```
Author: Yuming Wang <yu...@ebay.com>
Closes #21556 from wangyum/SPARK-24549.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9549a281
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9549a281
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9549a281
Branch: refs/heads/master
Commit: 9549a2814951f9ba969955d78ac4bd2240f85989
Parents: 2603ae3
Author: Yuming Wang <yu...@ebay.com>
Authored: Mon Jul 16 15:44:51 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jul 16 15:44:51 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 10 +
.../FilterPushdownBenchmark-results.txt | 96 ++++----
.../datasources/parquet/ParquetFileFormat.scala | 3 +-
.../datasources/parquet/ParquetFilters.scala | 225 ++++++++++++++-----
.../benchmark/FilterPushdownBenchmark.scala | 8 +-
.../parquet/ParquetFilterSuite.scala | 90 +++++++-
6 files changed, 324 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9549a281/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 07d33fa..41fe0c3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -387,6 +387,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED =
+ buildConf("spark.sql.parquet.filterPushdown.decimal")
+ .doc("If true, enables Parquet filter push-down optimization for Decimal. " +
+ "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
+
val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
@@ -1505,6 +1513,8 @@ class SQLConf extends Serializable with Logging {
def parquetFilterPushDownTimestamp: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED)
+ def parquetFilterPushDownDecimal: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED)
+
def parquetFilterPushDownStringStartWith: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
http://git-wip-us.apache.org/repos/asf/spark/blob/9549a281/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
index 4f38cc4..2215ed9 100644
--- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
+++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 3785 / 3867 4.2 240.6 1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928 4.1 242.9 1.0X
-Native ORC Vectorized 3981 / 4049 4.0 253.1 1.0X
-Native ORC Vectorized (Pushdown) 702 / 735 22.4 44.6 5.4X
+Parquet Vectorized 4546 / 4743 3.5 289.0 1.0X
+Parquet Vectorized (Pushdown) 161 / 175 98.0 10.2 28.3X
+Native ORC Vectorized 5721 / 5842 2.7 363.7 0.8X
+Native ORC Vectorized (Pushdown) 1019 / 1070 15.4 64.8 4.5X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 10% decimal(9, 2) rows (value < 1572864): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 4694 / 4813 3.4 298.4 1.0X
-Parquet Vectorized (Pushdown) 4839 / 4907 3.3 307.6 1.0X
-Native ORC Vectorized 4943 / 5032 3.2 314.2 0.9X
-Native ORC Vectorized (Pushdown) 2043 / 2085 7.7 129.9 2.3X
+Parquet Vectorized 6340 / 7236 2.5 403.1 1.0X
+Parquet Vectorized (Pushdown) 3052 / 3164 5.2 194.1 2.1X
+Native ORC Vectorized 8370 / 9214 1.9 532.1 0.8X
+Native ORC Vectorized (Pushdown) 4137 / 4242 3.8 263.0 1.5X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 50% decimal(9, 2) rows (value < 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 8321 / 8472 1.9 529.0 1.0X
-Parquet Vectorized (Pushdown) 8125 / 8471 1.9 516.6 1.0X
-Native ORC Vectorized 8524 / 8616 1.8 541.9 1.0X
-Native ORC Vectorized (Pushdown) 7961 / 8383 2.0 506.1 1.0X
+Parquet Vectorized 12976 / 13249 1.2 825.0 1.0X
+Parquet Vectorized (Pushdown) 12655 / 13570 1.2 804.6 1.0X
+Native ORC Vectorized 15562 / 15950 1.0 989.4 0.8X
+Native ORC Vectorized (Pushdown) 15042 / 15668 1.0 956.3 0.9X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 90% decimal(9, 2) rows (value < 14155776): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 9587 / 10112 1.6 609.5 1.0X
-Parquet Vectorized (Pushdown) 9726 / 10370 1.6 618.3 1.0X
-Native ORC Vectorized 10119 / 11147 1.6 643.4 0.9X
-Native ORC Vectorized (Pushdown) 9366 / 9497 1.7 595.5 1.0X
+Parquet Vectorized 14303 / 14616 1.1 909.3 1.0X
+Parquet Vectorized (Pushdown) 14380 / 14649 1.1 914.3 1.0X
+Native ORC Vectorized 16964 / 17358 0.9 1078.5 0.8X
+Native ORC Vectorized (Pushdown) 17255 / 17874 0.9 1097.0 0.8X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 1 decimal(18, 2) row (value = 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 4060 / 4093 3.9 258.1 1.0X
-Parquet Vectorized (Pushdown) 4037 / 4125 3.9 256.6 1.0X
-Native ORC Vectorized 4756 / 4811 3.3 302.4 0.9X
-Native ORC Vectorized (Pushdown) 824 / 889 19.1 52.4 4.9X
+Parquet Vectorized 4701 / 6416 3.3 298.9 1.0X
+Parquet Vectorized (Pushdown) 128 / 164 122.8 8.1 36.7X
+Native ORC Vectorized 5698 / 7904 2.8 362.3 0.8X
+Native ORC Vectorized (Pushdown) 913 / 942 17.2 58.0 5.2X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 10% decimal(18, 2) rows (value < 1572864): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 5157 / 5271 3.0 327.9 1.0X
-Parquet Vectorized (Pushdown) 5051 / 5141 3.1 321.1 1.0X
-Native ORC Vectorized 5723 / 6146 2.7 363.9 0.9X
-Native ORC Vectorized (Pushdown) 2198 / 2317 7.2 139.8 2.3X
+Parquet Vectorized 5376 / 5461 2.9 341.8 1.0X
+Parquet Vectorized (Pushdown) 1479 / 1543 10.6 94.0 3.6X
+Native ORC Vectorized 6640 / 6748 2.4 422.2 0.8X
+Native ORC Vectorized (Pushdown) 2438 / 2479 6.5 155.0 2.2X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 50% decimal(18, 2) rows (value < 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 8608 / 8647 1.8 547.3 1.0X
-Parquet Vectorized (Pushdown) 8471 / 8584 1.9 538.6 1.0X
-Native ORC Vectorized 9249 / 10048 1.7 588.0 0.9X
-Native ORC Vectorized (Pushdown) 7645 / 8091 2.1 486.1 1.1X
+Parquet Vectorized 9224 / 9356 1.7 586.5 1.0X
+Parquet Vectorized (Pushdown) 7172 / 7415 2.2 456.0 1.3X
+Native ORC Vectorized 11017 / 11408 1.4 700.4 0.8X
+Native ORC Vectorized (Pushdown) 8771 / 10218 1.8 557.7 1.1X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 90% decimal(18, 2) rows (value < 14155776): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 11658 / 11888 1.3 741.2 1.0X
-Parquet Vectorized (Pushdown) 11812 / 12098 1.3 751.0 1.0X
-Native ORC Vectorized 12943 / 13312 1.2 822.9 0.9X
-Native ORC Vectorized (Pushdown) 13139 / 13465 1.2 835.4 0.9X
+Parquet Vectorized 13933 / 15990 1.1 885.8 1.0X
+Parquet Vectorized (Pushdown) 12683 / 12942 1.2 806.4 1.1X
+Native ORC Vectorized 16344 / 20196 1.0 1039.1 0.9X
+Native ORC Vectorized (Pushdown) 15162 / 16627 1.0 964.0 0.9X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 1 decimal(38, 2) row (value = 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 5491 / 5716 2.9 349.1 1.0X
-Parquet Vectorized (Pushdown) 5515 / 5615 2.9 350.6 1.0X
-Native ORC Vectorized 4582 / 4654 3.4 291.3 1.2X
-Native ORC Vectorized (Pushdown) 815 / 861 19.3 51.8 6.7X
+Parquet Vectorized 7102 / 8282 2.2 451.5 1.0X
+Parquet Vectorized (Pushdown) 124 / 150 126.4 7.9 57.1X
+Native ORC Vectorized 5811 / 6883 2.7 369.5 1.2X
+Native ORC Vectorized (Pushdown) 1121 / 1502 14.0 71.3 6.3X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 10% decimal(38, 2) rows (value < 1572864): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 6432 / 6527 2.4 409.0 1.0X
-Parquet Vectorized (Pushdown) 6513 / 6607 2.4 414.1 1.0X
-Native ORC Vectorized 5618 / 6085 2.8 357.2 1.1X
-Native ORC Vectorized (Pushdown) 2403 / 2443 6.5 152.8 2.7X
+Parquet Vectorized 6894 / 7562 2.3 438.3 1.0X
+Parquet Vectorized (Pushdown) 1863 / 1980 8.4 118.4 3.7X
+Native ORC Vectorized 6812 / 6848 2.3 433.1 1.0X
+Native ORC Vectorized (Pushdown) 2511 / 2598 6.3 159.7 2.7X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 50% decimal(38, 2) rows (value < 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 11041 / 11467 1.4 701.9 1.0X
-Parquet Vectorized (Pushdown) 10909 / 11484 1.4 693.5 1.0X
-Native ORC Vectorized 9860 / 10436 1.6 626.9 1.1X
-Native ORC Vectorized (Pushdown) 7908 / 8069 2.0 502.8 1.4X
+Parquet Vectorized 11732 / 12183 1.3 745.9 1.0X
+Parquet Vectorized (Pushdown) 8912 / 9945 1.8 566.6 1.3X
+Native ORC Vectorized 11499 / 12387 1.4 731.1 1.0X
+Native ORC Vectorized (Pushdown) 9328 / 9382 1.7 593.1 1.3X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Select 90% decimal(38, 2) rows (value < 14155776): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Parquet Vectorized 14816 / 16877 1.1 942.0 1.0X
-Parquet Vectorized (Pushdown) 15383 / 15740 1.0 978.0 1.0X
-Native ORC Vectorized 14408 / 14771 1.1 916.0 1.0X
-Native ORC Vectorized (Pushdown) 13968 / 14805 1.1 888.1 1.1X
+Parquet Vectorized 16272 / 16328 1.0 1034.6 1.0X
+Parquet Vectorized (Pushdown) 15714 / 18100 1.0 999.1 1.0X
+Native ORC Vectorized 16539 / 18897 1.0 1051.5 1.0X
+Native ORC Vectorized (Pushdown) 16328 / 17306 1.0 1038.1 1.0X
================================================================================================
http://git-wip-us.apache.org/repos/asf/spark/blob/9549a281/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 3ec33b2..295960b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -342,6 +342,7 @@ class ParquetFileFormat
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+ val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
@@ -367,7 +368,7 @@ class ParquetFileFormat
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
.getFileMetaData.getSchema
- val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp,
+ val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
pushDownStringStartWith, pushDownInFilterThreshold)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
http://git-wip-us.apache.org/repos/asf/spark/blob/9549a281/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 0c146f2..58b4a76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.lang.{Long => JLong}
+import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters.asScalaBufferConverter
@@ -41,44 +42,65 @@ import org.apache.spark.unsafe.types.UTF8String
private[parquet] class ParquetFilters(
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int) {
private case class ParquetSchemaType(
originalType: OriginalType,
primitiveTypeName: PrimitiveTypeName,
+ length: Int,
decimalMetadata: DecimalMetadata)
- private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
- private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
- private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
- private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
- private val ParquetLongType = ParquetSchemaType(null, INT64, null)
- private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
- private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
- private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
- private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
- private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
- private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, null)
- private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, null)
+ private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, null)
+ private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+ private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+ private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+ private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+ private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+ private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+ private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+ private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+ private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
+ private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null)
+ private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null)
private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
}
+ private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()
+
+ private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue()
+
+ private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): Binary = {
+ val decimalBuffer = new Array[Byte](numBytes)
+ val bytes = decimal.unscaledValue().toByteArray
+
+ val fixedLengthBytes = if (bytes.length == numBytes) {
+ bytes
+ } else {
+ val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+ java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte)
+ System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length)
+ decimalBuffer
+ }
+ Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
+ }
+
private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetBooleanType =>
- (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
+ (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[JBoolean])
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[JDouble])
// Binary.fromString and Binary.fromByteArray don't accept null values
case ParquetStringType =>
@@ -102,21 +124,34 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
+
+ case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+ (n: String, v: Any) => FilterApi.eq(
+ intColumn(n),
+ Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
+ case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+ (n: String, v: Any) => FilterApi.eq(
+ longColumn(n),
+ Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
+ case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal =>
+ (n: String, v: Any) => FilterApi.eq(
+ binaryColumn(n),
+ Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull)
}
private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
case ParquetBooleanType =>
- (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
+ (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[JBoolean])
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[JDouble])
case ParquetStringType =>
(n: String, v: Any) => FilterApi.notEq(
@@ -139,6 +174,19 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
+
+ case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+ (n: String, v: Any) => FilterApi.notEq(
+ intColumn(n),
+ Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
+ case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+ (n: String, v: Any) => FilterApi.notEq(
+ longColumn(n),
+ Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
+ case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal =>
+ (n: String, v: Any) => FilterApi.notEq(
+ binaryColumn(n),
+ Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull)
}
private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
@@ -146,11 +194,11 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) =>
FilterApi.lt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
+ (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[JDouble])
case ParquetStringType =>
(n: String, v: Any) =>
@@ -169,6 +217,16 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.lt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+
+ case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.lt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.lt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.lt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
@@ -176,11 +234,11 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) =>
FilterApi.ltEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[JDouble])
case ParquetStringType =>
(n: String, v: Any) =>
@@ -199,6 +257,16 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+
+ case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
@@ -206,11 +274,11 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) =>
FilterApi.gt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
+ (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[JDouble])
case ParquetStringType =>
(n: String, v: Any) =>
@@ -229,6 +297,16 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.gt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+
+ case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.gt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.gt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.gt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
@@ -236,11 +314,11 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) =>
FilterApi.gtEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[JDouble])
case ParquetStringType =>
(n: String, v: Any) =>
@@ -259,6 +337,16 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+
+ case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.gtEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.gtEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal]))
+ case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal =>
+ (n: String, v: Any) =>
+ FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
/**
@@ -271,7 +359,7 @@ private[parquet] class ParquetFilters(
// and it does not support to create filters for them.
m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
f.getName -> ParquetSchemaType(
- f.getOriginalType, f.getPrimitiveTypeName, f.getDecimalMetadata)
+ f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)
}.toMap
case _ => Map.empty[String, ParquetSchemaType]
}
@@ -282,21 +370,45 @@ private[parquet] class ParquetFilters(
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToType = getFieldMap(schema)
+ // Decimal type must make sure that filter value's scale matched the file.
+ // If doesn't matched, which would cause data corruption.
+ def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
+ case decimal: JBigDecimal =>
+ decimal.scale == decimalMeta.getScale
+ case _ => false
+ }
+
+ // Parquet's type in the given file should be matched to the value's type
+ // in the pushed filter in order to push down the filter to Parquet.
+ def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+ value == null || (nameToType(name) match {
+ case ParquetBooleanType => value.isInstanceOf[JBoolean]
+ case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
+ case ParquetLongType => value.isInstanceOf[JLong]
+ case ParquetFloatType => value.isInstanceOf[JFloat]
+ case ParquetDoubleType => value.isInstanceOf[JDouble]
+ case ParquetStringType => value.isInstanceOf[String]
+ case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+ case ParquetDateType => value.isInstanceOf[Date]
+ case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
+ value.isInstanceOf[Timestamp]
+ case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
+ isDecimalMatched(value, decimalMeta)
+ case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
+ isDecimalMatched(value, decimalMeta)
+ case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
+ isDecimalMatched(value, decimalMeta)
+ case _ => false
+ })
+ }
+
// Parquet does not allow dots in the column name because dots are used as a column path
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
// with missing columns. The incorrect results could be got from Parquet when we push down
// filters for the column having dots in the names. Thus, we do not push down such filters.
// See SPARK-20364.
- def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")
-
- // All DataTypes that support `makeEq` can provide better performance.
- def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match {
- case ParquetBooleanType | ParquetByteType | ParquetShortType | ParquetIntegerType
- | ParquetLongType | ParquetFloatType | ParquetDoubleType | ParquetStringType
- | ParquetBinaryType => true
- case ParquetDateType if pushDownDate => true
- case ParquetTimestampMicrosType | ParquetTimestampMillisType if pushDownTimestamp => true
- case _ => false
+ def canMakeFilterOn(name: String, value: Any): Boolean = {
+ nameToType.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
}
// NOTE:
@@ -315,29 +427,29 @@ private[parquet] class ParquetFilters(
// Probably I missed something and obviously this should be changed.
predicate match {
- case sources.IsNull(name) if canMakeFilterOn(name) =>
+ case sources.IsNull(name) if canMakeFilterOn(name, null) =>
makeEq.lift(nameToType(name)).map(_(name, null))
- case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+ case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
makeNotEq.lift(nameToType(name)).map(_(name, null))
- case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+ case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
makeEq.lift(nameToType(name)).map(_(name, value))
- case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) =>
+ case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) =>
makeNotEq.lift(nameToType(name)).map(_(name, value))
- case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+ case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) =>
makeEq.lift(nameToType(name)).map(_(name, value))
- case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) =>
+ case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) =>
makeNotEq.lift(nameToType(name)).map(_(name, value))
- case sources.LessThan(name, value) if canMakeFilterOn(name) =>
+ case sources.LessThan(name, value) if canMakeFilterOn(name, value) =>
makeLt.lift(nameToType(name)).map(_(name, value))
- case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name) =>
+ case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
makeLtEq.lift(nameToType(name)).map(_(name, value))
- case sources.GreaterThan(name, value) if canMakeFilterOn(name) =>
+ case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) =>
makeGt.lift(nameToType(name)).map(_(name, value))
- case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name) =>
+ case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
makeGtEq.lift(nameToType(name)).map(_(name, value))
case sources.And(lhs, rhs) =>
@@ -362,13 +474,14 @@ private[parquet] class ParquetFilters(
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)
- case sources.In(name, values) if canMakeFilterOn(name) && shouldConvertInPredicate(name)
+ case sources.In(name, values) if canMakeFilterOn(name, values.head)
&& values.distinct.length <= pushDownInFilterThreshold =>
values.distinct.flatMap { v =>
makeEq.lift(nameToType(name)).map(_(name, v))
}.reduceLeftOption(FilterApi.or)
- case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) =>
+ case sources.StringStartsWith(name, prefix)
+ if pushDownStartWith && canMakeFilterOn(name, prefix) =>
Option(prefix).map { v =>
FilterApi.userDefined(binaryColumn(name),
new UserDefinedPredicate[Binary] with Serializable {
http://git-wip-us.apache.org/repos/asf/spark/blob/9549a281/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 567a8eb..bdb60b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -290,8 +290,12 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
s"decimal(${DecimalType.MAX_PRECISION}, 2)"
).foreach { dt =>
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
- val df = spark.range(numRows).selectExpr(columns: _*)
- .withColumn("value", monotonically_increasing_id().cast(dt))
+ val valueCol = if (dt.equalsIgnoreCase(s"decimal(${Decimal.MAX_INT_DIGITS}, 2)")) {
+ monotonically_increasing_id() % 9999999
+ } else {
+ monotonically_increasing_id()
+ }
+ val df = spark.range(numRows).selectExpr(columns: _*).withColumn("value", valueCol.cast(dt))
withTempTable("orcTable", "patquetTable") {
saveAsTable(df, dir)
http://git-wip-us.apache.org/repos/asf/spark/blob/9549a281/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 924f136..be4f498 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
@@ -58,7 +59,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
private lazy val parquetFilters =
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
- conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold)
+ conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
+ conf.parquetFilterPushDownInFilterThreshold)
override def beforeEach(): Unit = {
super.beforeEach()
@@ -86,6 +88,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true",
+ SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
@@ -179,6 +182,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
+ private def testDecimalPushDown(data: DataFrame)(f: DataFrame => Unit): Unit = {
+ withTempPath { file =>
+ data.write.parquet(file.getCanonicalPath)
+ readParquetFile(file.toString)(f)
+ }
+ }
+
// This function tests that exactly go through the `canDrop` and `inverseCanDrop`.
private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit = {
withTempPath { dir =>
@@ -512,6 +522,84 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
+ test("filter pushdown - decimal") {
+ Seq(true, false).foreach { legacyFormat =>
+ withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyFormat.toString) {
+ Seq(
+ s"a decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 32BitDecimalType
+ s"a decimal(${Decimal.MAX_LONG_DIGITS}, 2)", // 64BitDecimalType
+ "a decimal(38, 18)" // ByteArrayDecimalType
+ ).foreach { schemaDDL =>
+ val schema = StructType.fromDDL(schemaDDL)
+ val rdd =
+ spark.sparkContext.parallelize((1 to 4).map(i => Row(new java.math.BigDecimal(i))))
+ val dataFrame = spark.createDataFrame(rdd, schema)
+ testDecimalPushDown(dataFrame) { implicit df =>
+ assert(df.schema === schema)
+ checkFilterPredicate('a.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('a.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('a === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('a <=> 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('a =!= 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate('a < 2, classOf[Lt[_]], 1)
+ checkFilterPredicate('a > 3, classOf[Gt[_]], 4)
+ checkFilterPredicate('a <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('a >= 4, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1) === 'a, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(1) <=> 'a, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2) > 'a, classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3) < 'a, classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1) >= 'a, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= 'a, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!('a < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('a < 2 || 'a > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
+ }
+ }
+ }
+ }
+ }
+
+ test("Ensure that filter value matched the parquet file schema") {
+ val scale = 2
+ val schema = StructType(Seq(
+ StructField("cint", IntegerType),
+ StructField("cdecimal1", DecimalType(Decimal.MAX_INT_DIGITS, scale)),
+ StructField("cdecimal2", DecimalType(Decimal.MAX_LONG_DIGITS, scale)),
+ StructField("cdecimal3", DecimalType(DecimalType.MAX_PRECISION, scale))
+ ))
+
+ val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
+
+ val decimal = new JBigDecimal(10).setScale(scale)
+ val decimal1 = new JBigDecimal(10).setScale(scale + 1)
+ assert(decimal.scale() === scale)
+ assert(decimal1.scale() === scale + 1)
+
+ assertResult(Some(lt(intColumn("cdecimal1"), 1000: Integer))) {
+ parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal1", decimal))
+ }
+ assertResult(None) {
+ parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal1", decimal1))
+ }
+
+ assertResult(Some(lt(longColumn("cdecimal2"), 1000L: java.lang.Long))) {
+ parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal2", decimal))
+ }
+ assertResult(None) {
+ parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal2", decimal1))
+ }
+
+ assert(parquetFilters.createFilter(
+ parquetSchema, sources.LessThan("cdecimal3", decimal)).isDefined)
+ assertResult(None) {
+ parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal3", decimal1))
+ }
+ }
+
test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org