You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/07/04 12:15:57 UTC
spark git commit: [SPARK-24716][SQL] Refactor ParquetFilters
Repository: spark
Updated Branches:
refs/heads/master b2deef64f -> 021145f36
[SPARK-24716][SQL] Refactor ParquetFilters
## What changes were proposed in this pull request?
Replace DataFrame schema to Parquet file schema when create `ParquetFilters`.
Thus we can easily implement `Decimal` and `Timestamp` push down. some thing like this:
```scala
// DecimalType: 32BitDecimalType
case ParquetSchemaType(DECIMAL, INT32, decimal)
if pushDownDecimal =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
.asInstanceOf[Integer]).orNull)
// DecimalType: 64BitDecimalType
case ParquetSchemaType(DECIMAL, INT64, decimal)
if pushDownDecimal =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
.asInstanceOf[java.lang.Long]).orNull)
// DecimalType: LegacyParquetFormat 32BitDecimalType & 64BitDecimalType
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal)
if pushDownDecimal && decimal.getPrecision <= Decimal.MAX_LONG_DIGITS =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(d => decimalToBinaryUsingUnscaledLong(decimal.getPrecision,
d.asInstanceOf[JBigDecimal])).orNull)
// DecimalType: ByteArrayDecimalType
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal)
if pushDownDecimal && decimal.getPrecision > Decimal.MAX_LONG_DIGITS =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(d => decimalToBinaryUsingUnscaledBytes(decimal.getPrecision,
d.asInstanceOf[JBigDecimal])).orNull)
```
```scala
// INT96 doesn't support pushdown
case ParquetSchemaType(TIMESTAMP_MICROS, INT64, null) =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
.asInstanceOf[java.lang.Long]).orNull)
case ParquetSchemaType(TIMESTAMP_MILLIS, INT64, null) =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[java.lang.Long]).orNull)
```
## How was this patch tested?
unit tests
Author: Yuming Wang <yu...@ebay.com>
Closes #21696 from wangyum/SPARK-24716.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/021145f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/021145f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/021145f3
Branch: refs/heads/master
Commit: 021145f36432b386cce30450c888a85393d5169f
Parents: b2deef6
Author: Yuming Wang <yu...@ebay.com>
Authored: Wed Jul 4 20:15:40 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jul 4 20:15:40 2018 +0800
----------------------------------------------------------------------
.../datasources/parquet/ParquetFileFormat.scala | 34 ++--
.../datasources/parquet/ParquetFilters.scala | 173 ++++++++++---------
.../org/apache/spark/sql/sources/filters.scala | 2 +-
.../parquet/ParquetFilterSuite.scala | 13 +-
4 files changed, 121 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 93de1fa..52a18ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -353,25 +353,13 @@ class ParquetFileFormat
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
- // Try to push down filters when filter push-down is enabled.
- val pushed = if (enableParquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
- .createFilter(requiredSchema, _))
- .reduceOption(FilterApi.and)
- } else {
- None
- }
-
val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+ val filePath = fileSplit.getPath
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
- fileSplit.getPath,
+ filePath,
fileSplit.getStart,
fileSplit.getStart + fileSplit.getLength,
fileSplit.getLength,
@@ -379,12 +367,28 @@ class ParquetFileFormat
null)
val sharedConf = broadcastedHadoopConf.value.value
+
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
+ .getFileMetaData.getSchema
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
+ .createFilter(parquetSchema, _))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
def isCreatedByParquetMr(): Boolean = {
- val footer = ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS)
+ val footer = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
}
val convertTz =
http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 21c9e2e..4827f70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -19,15 +19,19 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.sql.Date
+import scala.collection.JavaConverters.asScalaBufferConverter
+
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType}
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -35,171 +39,180 @@ import org.apache.spark.unsafe.types.UTF8String
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
+ private case class ParquetSchemaType(
+ originalType: OriginalType,
+ primitiveTypeName: PrimitiveTypeName,
+ decimalMetadata: DecimalMetadata)
+
+ private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+ private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
+ private val ParquetLongType = ParquetSchemaType(null, INT64, null)
+ private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
+ private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
+ private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
+ private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
+ private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+
private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
}
- private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case BooleanType =>
+ private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+ case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
+ case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// Binary.fromString and Binary.fromByteArray don't accept null values
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
- case BinaryType =>
+ case ParquetBinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if pushDownDate =>
+ case ParquetDateType if pushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
- private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case BooleanType =>
+ private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+ case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
+ case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
- case BinaryType =>
+ case ParquetBinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if pushDownDate =>
+ case ParquetDateType if pushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
- private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
+ private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+ case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
+ FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+ case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.lt(
- intColumn(n),
- Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+ case ParquetDateType if pushDownDate =>
+ (n: String, v: Any) =>
+ FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}
- private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
- case LongType =>
+ private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+ case ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
+ FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+ case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.ltEq(
- intColumn(n),
- Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+ case ParquetDateType if pushDownDate =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}
- private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
- case LongType =>
+ private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+ case ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
+ FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+ case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.gt(
- intColumn(n),
- Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+ case ParquetDateType if pushDownDate =>
+ (n: String, v: Any) =>
+ FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}
- private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
- case LongType =>
+ private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+ case ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Integer])
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) =>
- FilterApi.gtEq(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
+ FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+ case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.gtEq(
- intColumn(n),
- Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+ case ParquetDateType if pushDownDate =>
+ (n: String, v: Any) =>
+ FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}
/**
* Returns a map from name of the column to the data type, if predicate push down applies.
*/
- private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
- case StructType(fields) =>
+ private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match {
+ case m: MessageType =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
- fields.map(f => f.name -> f.dataType).toMap
- case _ => Map.empty[String, DataType]
+ m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
+ f.getName -> ParquetSchemaType(
+ f.getOriginalType, f.getPrimitiveTypeName, f.getDecimalMetadata)
+ }.toMap
+ case _ => Map.empty[String, ParquetSchemaType]
}
/**
* Converts data sources filters to Parquet filter predicates.
*/
- def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
+ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToType = getFieldMap(schema)
// Parquet does not allow dots in the column name because dots are used as a column path
http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 2499e9b..bdd8c4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -199,7 +199,7 @@ case class StringStartsWith(attribute: String, value: String) extends Filter {
/**
* A filter that evaluates to `true` iff the attribute evaluates to
- * a string that starts with `value`.
+ * a string that ends with `value`.
*
* @since 1.3.1
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d9ae585..8b96c84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -103,7 +103,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(selectedFilters.nonEmpty, "No filter is pushed down")
selectedFilters.foreach { pred =>
- val maybeFilter = parquetFilters.createFilter(df.schema, pred)
+ val maybeFilter = parquetFilters.createFilter(
+ new SparkToParquetSchemaConverter(conf).convert(df.schema), pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
maybeFilter.exists(_.getClass === filterClass)
@@ -542,12 +543,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
StructField("c", DoubleType, nullable = true)
))
+ val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
+
assertResult(Some(and(
lt(intColumn("a"), 10: Integer),
gt(doubleColumn("c"), 1.5: java.lang.Double)))
) {
parquetFilters.createFilter(
- schema,
+ parquetSchema,
sources.And(
sources.LessThan("a", 10),
sources.GreaterThan("c", 1.5D)))
@@ -555,7 +558,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assertResult(None) {
parquetFilters.createFilter(
- schema,
+ parquetSchema,
sources.And(
sources.LessThan("a", 10),
sources.StringContains("b", "prefix")))
@@ -563,7 +566,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assertResult(None) {
parquetFilters.createFilter(
- schema,
+ parquetSchema,
sources.Not(
sources.And(
sources.GreaterThan("a", 1),
@@ -729,7 +732,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assertResult(None) {
parquetFilters.createFilter(
- df.schema,
+ new SparkToParquetSchemaConverter(conf).convert(df.schema),
sources.StringStartsWith("_1", null))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org