You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/30 07:08:06 UTC

spark git commit: [SPARK-23727][SQL] Support for pushing down filters for DateType in parquet

Repository: spark
Updated Branches:
  refs/heads/master df05fb63a -> b02e76cbf


[SPARK-23727][SQL] Support for pushing down filters for DateType in parquet

## What changes were proposed in this pull request?

This PR supports for pushing down filters for DateType in parquet

## How was this patch tested?

Added UT and tested in local.

Author: yucai <yy...@ebay.com>

Closes #20851 from yucai/SPARK-23727.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b02e76cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b02e76cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b02e76cb

Branch: refs/heads/master
Commit: b02e76cbffe9e589b7a4e60f91250ca12a4420b2
Parents: df05fb6
Author: yucai <yy...@ebay.com>
Authored: Fri Mar 30 15:07:38 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Mar 30 15:07:38 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 ++++
 .../datasources/parquet/ParquetFilters.scala    | 33 +++++++++++++
 .../parquet/ParquetFilterSuite.scala            | 50 ++++++++++++++++++--
 3 files changed, 89 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9cb03b5..13f31a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -353,6 +353,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
+    .doc("If true, enables Parquet filter push-down optimization for Date. " +
+      "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
+    .internal()
+    .booleanConf
+    .createWithDefault(true)
+
   val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
     .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
       "versions, when converting Parquet schema to Spark SQL schema and vice versa.")
@@ -1329,6 +1336,8 @@ class SQLConf extends Serializable with Logging {
 
   def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
 
+  def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
+
   def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 763841e..ccc8306 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.sql.Date
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 
@@ -29,6 +34,10 @@ import org.apache.spark.sql.types._
  */
 private[parquet] object ParquetFilters {
 
+  private def dateToDays(date: Date): SQLDate = {
+    DateTimeUtils.fromJavaDate(date)
+  }
+
   private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
     case BooleanType =>
       (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
@@ -50,6 +59,10 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+      (n: String, v: Any) => FilterApi.eq(
+        intColumn(n),
+        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
   private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -72,6 +85,10 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+      (n: String, v: Any) => FilterApi.notEq(
+        intColumn(n),
+        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
   private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -91,6 +108,10 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+      (n: String, v: Any) => FilterApi.lt(
+        intColumn(n),
+        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
   private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -110,6 +131,10 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+      (n: String, v: Any) => FilterApi.ltEq(
+        intColumn(n),
+        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
   private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -129,6 +154,10 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+      (n: String, v: Any) => FilterApi.gt(
+        intColumn(n),
+        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
   private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -148,6 +177,10 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+      (n: String, v: Any) => FilterApi.gtEq(
+        intColumn(n),
+        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 3380195..1d3476e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.charset.StandardCharsets
+import java.sql.Date
 
 import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
 import org.apache.parquet.filter2.predicate.FilterApi._
@@ -76,8 +77,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       expected: Seq[Row]): Unit = {
     val output = predicate.collect { case a: Attribute => a }.distinct
 
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+    withSQLConf(
+      SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+      SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
+      SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val query = df
           .select(output.map(e => Column(e)): _*)
           .where(Column(predicate))
@@ -102,7 +105,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
           maybeFilter.exists(_.getClass === filterClass)
         }
         checker(stripSparkFilter(query), expected)
-      }
     }
   }
 
@@ -313,6 +315,48 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
   }
 
+  test("filter pushdown - date") {
+    implicit class StringToDate(s: String) {
+      def date: Date = Date.valueOf(s)
+    }
+
+    val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
+
+    withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))
+
+      checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
+      checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
+      checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
+        Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))
+
+      checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
+      checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
+      checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
+      checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)
+
+      checkFilterPredicate(
+        Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
+      checkFilterPredicate(
+        Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
+      checkFilterPredicate(
+        Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
+      checkFilterPredicate(
+        Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
+      checkFilterPredicate(
+        Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
+      checkFilterPredicate(
+        Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)
+
+      checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
+      checkFilterPredicate(
+        '_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
+        classOf[Operators.Or],
+        Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
+    }
+  }
+
   test("SPARK-6554: don't push down predicates which reference partition columns") {
     import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org