You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/04 01:27:24 UTC
spark git commit: [SPARK-24167][SQL] ParquetFilters should not access
SQLConf at executor side
Repository: spark
Updated Branches:
refs/heads/master e646ae67f -> 0c23e254c
[SPARK-24167][SQL] ParquetFilters should not access SQLConf at executor side
## What changes were proposed in this pull request?
This PR is extracted from #21190 , to make it easier to backport.
`ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't call `conf.parquetFilterPushDownDate` there.
## How was this patch tested?
it's tested in #21190
Author: Wenchen Fan <we...@databricks.com>
Closes #21224 from cloud-fan/minor2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c23e254
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c23e254
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c23e254
Branch: refs/heads/master
Commit: 0c23e254c38d4a9210939e1e1b0074278568abed
Parents: e646ae6
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri May 4 09:27:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri May 4 09:27:14 2018 +0800
----------------------------------------------------------------------
.../datasources/parquet/ParquetFileFormat.scala | 3 ++-
.../datasources/parquet/ParquetFilters.scala | 15 +++++++--------
.../datasources/parquet/ParquetFilterSuite.scala | 10 ++++++----
3 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0c23e254/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index d8f47ee..d1f9e11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -342,6 +342,7 @@ class ParquetFileFormat
sparkSession.sessionState.conf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
+ val pushDownDate = sqlConf.parquetFilterPushDownDate
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -352,7 +353,7 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
- .flatMap(ParquetFilters.createFilter(requiredSchema, _))
+ .flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
http://git-wip-us.apache.org/repos/asf/spark/blob/0c23e254/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index ccc8306..3106261 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -25,14 +25,13 @@ import org.apache.parquet.io.api.Binary
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
-private[parquet] object ParquetFilters {
+private[parquet] class ParquetFilters(pushDownDate: Boolean) {
private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
@@ -59,7 +58,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -85,7 +84,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -108,7 +107,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -131,7 +130,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -154,7 +153,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.gt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -177,7 +176,7 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if SQLConf.get.parquetFilterPushDownDate =>
+ case DateType if pushDownDate =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
http://git-wip-us.apache.org/repos/asf/spark/blob/0c23e254/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 1d3476e..667e0b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -55,6 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
*/
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
+ private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)
+
override def beforeEach(): Unit = {
super.beforeEach()
// Note that there are many tests here that require record-level filtering set to be true.
@@ -99,7 +101,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(selectedFilters.nonEmpty, "No filter is pushed down")
selectedFilters.foreach { pred =>
- val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
+ val maybeFilter = parquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
maybeFilter.exists(_.getClass === filterClass)
@@ -517,7 +519,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
lt(intColumn("a"), 10: Integer),
gt(doubleColumn("c"), 1.5: java.lang.Double)))
) {
- ParquetFilters.createFilter(
+ parquetFilters.createFilter(
schema,
sources.And(
sources.LessThan("a", 10),
@@ -525,7 +527,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
assertResult(None) {
- ParquetFilters.createFilter(
+ parquetFilters.createFilter(
schema,
sources.And(
sources.LessThan("a", 10),
@@ -533,7 +535,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
assertResult(None) {
- ParquetFilters.createFilter(
+ parquetFilters.createFilter(
schema,
sources.Not(
sources.And(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org