You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/04 01:27:24 UTC

spark git commit: [SPARK-24167][SQL] ParquetFilters should not access SQLConf at executor side

Repository: spark
Updated Branches:
  refs/heads/master e646ae67f -> 0c23e254c


[SPARK-24167][SQL] ParquetFilters should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't call `conf.parquetFilterPushDownDate` there.

## How was this patch tested?

it's tested in #21190

Author: Wenchen Fan <we...@databricks.com>

Closes #21224 from cloud-fan/minor2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c23e254
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c23e254
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c23e254

Branch: refs/heads/master
Commit: 0c23e254c38d4a9210939e1e1b0074278568abed
Parents: e646ae6
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri May 4 09:27:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri May 4 09:27:14 2018 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala      |  3 ++-
 .../datasources/parquet/ParquetFilters.scala         | 15 +++++++--------
 .../datasources/parquet/ParquetFilterSuite.scala     | 10 ++++++----
 3 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c23e254/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index d8f47ee..d1f9e11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -342,6 +342,7 @@ class ParquetFileFormat
       sparkSession.sessionState.conf.parquetFilterPushDown
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
 
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
@@ -352,7 +353,7 @@ class ParquetFileFormat
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
           // is used here.
-          .flatMap(ParquetFilters.createFilter(requiredSchema, _))
+          .flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
           .reduceOption(FilterApi.and)
       } else {
         None

http://git-wip-us.apache.org/repos/asf/spark/blob/0c23e254/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index ccc8306..3106261 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -25,14 +25,13 @@ import org.apache.parquet.io.api.Binary
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 
 /**
  * Some utility function to convert Spark data source filters to Parquet filters.
  */
-private[parquet] object ParquetFilters {
+private[parquet] class ParquetFilters(pushDownDate: Boolean) {
 
   private def dateToDays(date: Date): SQLDate = {
     DateTimeUtils.fromJavaDate(date)
@@ -59,7 +58,7 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+    case DateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.eq(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -85,7 +84,7 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+    case DateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.notEq(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -108,7 +107,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+    case DateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.lt(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -131,7 +130,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+    case DateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.ltEq(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -154,7 +153,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+    case DateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.gt(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -177,7 +176,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if SQLConf.get.parquetFilterPushDownDate =>
+    case DateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.gtEq(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)

http://git-wip-us.apache.org/repos/asf/spark/blob/0c23e254/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 1d3476e..667e0b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -55,6 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
  */
 class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
+  private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     // Note that there are many tests here that require record-level filtering set to be true.
@@ -99,7 +101,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         assert(selectedFilters.nonEmpty, "No filter is pushed down")
 
         selectedFilters.foreach { pred =>
-          val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
+          val maybeFilter = parquetFilters.createFilter(df.schema, pred)
           assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
           // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
           maybeFilter.exists(_.getClass === filterClass)
@@ -517,7 +519,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       lt(intColumn("a"), 10: Integer),
       gt(doubleColumn("c"), 1.5: java.lang.Double)))
     ) {
-      ParquetFilters.createFilter(
+      parquetFilters.createFilter(
         schema,
         sources.And(
           sources.LessThan("a", 10),
@@ -525,7 +527,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
 
     assertResult(None) {
-      ParquetFilters.createFilter(
+      parquetFilters.createFilter(
         schema,
         sources.And(
           sources.LessThan("a", 10),
@@ -533,7 +535,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
 
     assertResult(None) {
-      ParquetFilters.createFilter(
+      parquetFilters.createFilter(
         schema,
         sources.Not(
           sources.And(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org