You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/30 07:08:06 UTC
spark git commit: [SPARK-23727][SQL] Support for pushing down filters
for DateType in parquet
Repository: spark
Updated Branches:
refs/heads/master df05fb63a -> b02e76cbf
[SPARK-23727][SQL] Support for pushing down filters for DateType in parquet
## What changes were proposed in this pull request?
This PR supports for pushing down filters for DateType in parquet
## How was this patch tested?
Added UT and tested in local.
Author: yucai <yy...@ebay.com>
Closes #20851 from yucai/SPARK-23727.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b02e76cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b02e76cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b02e76cb
Branch: refs/heads/master
Commit: b02e76cbffe9e589b7a4e60f91250ca12a4420b2
Parents: df05fb6
Author: yucai <yy...@ebay.com>
Authored: Fri Mar 30 15:07:38 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Mar 30 15:07:38 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 9 ++++
.../datasources/parquet/ParquetFilters.scala | 33 +++++++++++++
.../parquet/ParquetFilterSuite.scala | 50 ++++++++++++++++++--
3 files changed, 89 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9cb03b5..13f31a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -353,6 +353,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
+ .doc("If true, enables Parquet filter push-down optimization for Date. " +
+ "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
+
val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
@@ -1329,6 +1336,8 @@ class SQLConf extends Serializable with Logging {
def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
+ def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
+
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 763841e..ccc8306 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,10 +17,15 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.sql.Date
+
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
@@ -29,6 +34,10 @@ import org.apache.spark.sql.types._
*/
private[parquet] object ParquetFilters {
+ private def dateToDays(date: Date): SQLDate = {
+ DateTimeUtils.fromJavaDate(date)
+ }
+
private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
@@ -50,6 +59,10 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ (n: String, v: Any) => FilterApi.eq(
+ intColumn(n),
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -72,6 +85,10 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ (n: String, v: Any) => FilterApi.notEq(
+ intColumn(n),
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -91,6 +108,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ (n: String, v: Any) => FilterApi.lt(
+ intColumn(n),
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -110,6 +131,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ (n: String, v: Any) => FilterApi.ltEq(
+ intColumn(n),
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -129,6 +154,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ (n: String, v: Any) => FilterApi.gt(
+ intColumn(n),
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -148,6 +177,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ (n: String, v: Any) => FilterApi.gtEq(
+ intColumn(n),
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 3380195..1d3476e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.nio.charset.StandardCharsets
+import java.sql.Date
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
@@ -76,8 +77,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+ withSQLConf(
+ SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+ SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
@@ -102,7 +105,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
maybeFilter.exists(_.getClass === filterClass)
}
checker(stripSparkFilter(query), expected)
- }
}
}
@@ -313,6 +315,48 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
+ test("filter pushdown - date") {
+ implicit class StringToDate(s: String) {
+ def date: Date = Date.valueOf(s)
+ }
+
+ val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
+
+ withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))
+
+ checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
+ checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
+ checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
+ Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))
+
+ checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
+ checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
+ checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
+ checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)
+
+ checkFilterPredicate(
+ Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
+ checkFilterPredicate(
+ Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
+ checkFilterPredicate(
+ Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
+ checkFilterPredicate(
+ Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
+ checkFilterPredicate(
+ Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
+ checkFilterPredicate(
+ Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)
+
+ checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
+ checkFilterPredicate(
+ '_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
+ classOf[Operators.Or],
+ Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
+ }
+ }
+
test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org