You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/07/04 12:15:57 UTC

spark git commit: [SPARK-24716][SQL] Refactor ParquetFilters

Repository: spark
Updated Branches:
  refs/heads/master b2deef64f -> 021145f36


[SPARK-24716][SQL] Refactor ParquetFilters

## What changes were proposed in this pull request?

Replace DataFrame schema to Parquet file schema when create `ParquetFilters`.
Thus we can easily implement `Decimal` and `Timestamp` push down. some thing like this:
```scala
// DecimalType: 32BitDecimalType
case ParquetSchemaType(DECIMAL, INT32, decimal)
  if pushDownDecimal =>
  (n: String, v: Any) => FilterApi.eq(
    intColumn(n),
    Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
      .asInstanceOf[Integer]).orNull)
// DecimalType: 64BitDecimalType
case ParquetSchemaType(DECIMAL, INT64, decimal)
  if pushDownDecimal =>
  (n: String, v: Any) => FilterApi.eq(
    longColumn(n),
    Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
      .asInstanceOf[java.lang.Long]).orNull)
// DecimalType: LegacyParquetFormat 32BitDecimalType & 64BitDecimalType
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal)
  if pushDownDecimal && decimal.getPrecision <= Decimal.MAX_LONG_DIGITS =>
  (n: String, v: Any) => FilterApi.eq(
    binaryColumn(n),
    Option(v).map(d => decimalToBinaryUsingUnscaledLong(decimal.getPrecision,
      d.asInstanceOf[JBigDecimal])).orNull)
// DecimalType: ByteArrayDecimalType
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal)
  if pushDownDecimal && decimal.getPrecision > Decimal.MAX_LONG_DIGITS =>
  (n: String, v: Any) => FilterApi.eq(
    binaryColumn(n),
    Option(v).map(d => decimalToBinaryUsingUnscaledBytes(decimal.getPrecision,
      d.asInstanceOf[JBigDecimal])).orNull)
```

```scala
// INT96 doesn't support pushdown
case ParquetSchemaType(TIMESTAMP_MICROS, INT64, null) =>
  (n: String, v: Any) => FilterApi.eq(
    longColumn(n),
    Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
      .asInstanceOf[java.lang.Long]).orNull)
case ParquetSchemaType(TIMESTAMP_MILLIS, INT64, null) =>
  (n: String, v: Any) => FilterApi.eq(
    longColumn(n),
    Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[java.lang.Long]).orNull)
```

## How was this patch tested?

unit tests

Author: Yuming Wang <yu...@ebay.com>

Closes #21696 from wangyum/SPARK-24716.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/021145f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/021145f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/021145f3

Branch: refs/heads/master
Commit: 021145f36432b386cce30450c888a85393d5169f
Parents: b2deef6
Author: Yuming Wang <yu...@ebay.com>
Authored: Wed Jul 4 20:15:40 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jul 4 20:15:40 2018 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala |  34 ++--
 .../datasources/parquet/ParquetFilters.scala    | 173 ++++++++++---------
 .../org/apache/spark/sql/sources/filters.scala  |   2 +-
 .../parquet/ParquetFilterSuite.scala            |  13 +-
 4 files changed, 121 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 93de1fa..52a18ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -353,25 +353,13 @@ class ParquetFileFormat
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
 
-      // Try to push down filters when filter push-down is enabled.
-      val pushed = if (enableParquetFilterPushDown) {
-        filters
-          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-          // is used here.
-          .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
-          .createFilter(requiredSchema, _))
-          .reduceOption(FilterApi.and)
-      } else {
-        None
-      }
-
       val fileSplit =
         new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+      val filePath = fileSplit.getPath
 
       val split =
         new org.apache.parquet.hadoop.ParquetInputSplit(
-          fileSplit.getPath,
+          filePath,
           fileSplit.getStart,
           fileSplit.getStart + fileSplit.getLength,
           fileSplit.getLength,
@@ -379,12 +367,28 @@ class ParquetFileFormat
           null)
 
       val sharedConf = broadcastedHadoopConf.value.value
+
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
+          .getFileMetaData.getSchema
+        filters
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
+          .createFilter(parquetSchema, _))
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
+
       // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
       // *only* if the file was created by something other than "parquet-mr", so check the actual
       // writer here for this file.  We have to do this per-file, as each file in the table may
       // have different writers.
       def isCreatedByParquetMr(): Boolean = {
-        val footer = ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS)
+        val footer = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
         footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
       }
       val convertTz =

http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 21c9e2e..4827f70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -19,15 +19,19 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters.asScalaBufferConverter
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType}
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
@@ -35,171 +39,180 @@ import org.apache.spark.unsafe.types.UTF8String
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
 
+  private case class ParquetSchemaType(
+      originalType: OriginalType,
+      primitiveTypeName: PrimitiveTypeName,
+      decimalMetadata: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+
   private def dateToDays(date: Date): SQLDate = {
     DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-    case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+    case ParquetBooleanType =>
       (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
-    case IntegerType =>
+    case ParquetIntegerType =>
       (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
-    case LongType =>
+    case ParquetLongType =>
       (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
-    case FloatType =>
+    case ParquetFloatType =>
       (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
+    case ParquetDoubleType =>
       (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
 
     // Binary.fromString and Binary.fromByteArray don't accept null values
-    case StringType =>
+    case ParquetStringType =>
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
-    case BinaryType =>
+    case ParquetBinaryType =>
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-    case DateType if pushDownDate =>
+    case ParquetDateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.eq(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-    case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+    case ParquetBooleanType =>
       (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
-    case IntegerType =>
+    case ParquetIntegerType =>
       (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
-    case LongType =>
+    case ParquetLongType =>
       (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
-    case FloatType =>
+    case ParquetFloatType =>
       (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
+    case ParquetDoubleType =>
       (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
 
-    case StringType =>
+    case ParquetStringType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
-    case BinaryType =>
+    case ParquetBinaryType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-    case DateType if pushDownDate =>
+    case ParquetDateType if pushDownDate =>
       (n: String, v: Any) => FilterApi.notEq(
         intColumn(n),
         Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-    case IntegerType =>
+  private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+    case ParquetIntegerType =>
       (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
-    case LongType =>
+    case ParquetLongType =>
       (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
-    case FloatType =>
+    case ParquetFloatType =>
       (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
+    case ParquetDoubleType =>
       (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
 
-    case StringType =>
+    case ParquetStringType =>
       (n: String, v: Any) =>
-        FilterApi.lt(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
-    case BinaryType =>
+        FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+    case ParquetBinaryType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if pushDownDate =>
-      (n: String, v: Any) => FilterApi.lt(
-        intColumn(n),
-        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+    case ParquetDateType if pushDownDate =>
+      (n: String, v: Any) =>
+        FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
   }
 
-  private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
-    case LongType =>
+  private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+    case ParquetIntegerType =>
+      (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetLongType =>
       (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
-    case FloatType =>
+    case ParquetFloatType =>
       (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
+    case ParquetDoubleType =>
       (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
 
-    case StringType =>
+    case ParquetStringType =>
       (n: String, v: Any) =>
-        FilterApi.ltEq(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
-    case BinaryType =>
+        FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+    case ParquetBinaryType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if pushDownDate =>
-      (n: String, v: Any) => FilterApi.ltEq(
-        intColumn(n),
-        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+    case ParquetDateType if pushDownDate =>
+      (n: String, v: Any) =>
+        FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
   }
 
-  private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
-    case LongType =>
+  private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+    case ParquetIntegerType =>
+      (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetLongType =>
       (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
-    case FloatType =>
+    case ParquetFloatType =>
       (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
+    case ParquetDoubleType =>
       (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
 
-    case StringType =>
+    case ParquetStringType =>
       (n: String, v: Any) =>
-        FilterApi.gt(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
-    case BinaryType =>
+        FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+    case ParquetBinaryType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if pushDownDate =>
-      (n: String, v: Any) => FilterApi.gt(
-        intColumn(n),
-        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+    case ParquetDateType if pushDownDate =>
+      (n: String, v: Any) =>
+        FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
   }
 
-  private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
-    case LongType =>
+  private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
+    case ParquetIntegerType =>
+      (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Integer])
+    case ParquetLongType =>
       (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
-    case FloatType =>
+    case ParquetFloatType =>
       (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
+    case ParquetDoubleType =>
       (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
 
-    case StringType =>
+    case ParquetStringType =>
       (n: String, v: Any) =>
-        FilterApi.gtEq(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
-    case BinaryType =>
+        FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
+    case ParquetBinaryType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
-    case DateType if pushDownDate =>
-      (n: String, v: Any) => FilterApi.gtEq(
-        intColumn(n),
-        Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+    case ParquetDateType if pushDownDate =>
+      (n: String, v: Any) =>
+        FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
   }
 
   /**
    * Returns a map from name of the column to the data type, if predicate push down applies.
    */
-  private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
-    case StructType(fields) =>
+  private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match {
+    case m: MessageType =>
       // Here we don't flatten the fields in the nested schema but just look up through
       // root fields. Currently, accessing to nested fields does not push down filters
       // and it does not support to create filters for them.
-      fields.map(f => f.name -> f.dataType).toMap
-    case _ => Map.empty[String, DataType]
+      m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
+        f.getName -> ParquetSchemaType(
+          f.getOriginalType, f.getPrimitiveTypeName, f.getDecimalMetadata)
+      }.toMap
+    case _ => Map.empty[String, ParquetSchemaType]
   }
 
   /**
    * Converts data sources filters to Parquet filter predicates.
    */
-  def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
+  def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
     val nameToType = getFieldMap(schema)
 
     // Parquet does not allow dots in the column name because dots are used as a column path

http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 2499e9b..bdd8c4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -199,7 +199,7 @@ case class StringStartsWith(attribute: String, value: String) extends Filter {
 
 /**
  * A filter that evaluates to `true` iff the attribute evaluates to
- * a string that starts with `value`.
+ * a string that ends with `value`.
  *
  * @since 1.3.1
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/021145f3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d9ae585..8b96c84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -103,7 +103,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         assert(selectedFilters.nonEmpty, "No filter is pushed down")
 
         selectedFilters.foreach { pred =>
-          val maybeFilter = parquetFilters.createFilter(df.schema, pred)
+          val maybeFilter = parquetFilters.createFilter(
+            new SparkToParquetSchemaConverter(conf).convert(df.schema), pred)
           assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
           // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
           maybeFilter.exists(_.getClass === filterClass)
@@ -542,12 +543,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       StructField("c", DoubleType, nullable = true)
     ))
 
+    val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
+
     assertResult(Some(and(
       lt(intColumn("a"), 10: Integer),
       gt(doubleColumn("c"), 1.5: java.lang.Double)))
     ) {
       parquetFilters.createFilter(
-        schema,
+        parquetSchema,
         sources.And(
           sources.LessThan("a", 10),
           sources.GreaterThan("c", 1.5D)))
@@ -555,7 +558,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
     assertResult(None) {
       parquetFilters.createFilter(
-        schema,
+        parquetSchema,
         sources.And(
           sources.LessThan("a", 10),
           sources.StringContains("b", "prefix")))
@@ -563,7 +566,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
     assertResult(None) {
       parquetFilters.createFilter(
-        schema,
+        parquetSchema,
         sources.Not(
           sources.And(
             sources.GreaterThan("a", 1),
@@ -729,7 +732,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
       assertResult(None) {
         parquetFilters.createFilter(
-          df.schema,
+          new SparkToParquetSchemaConverter(conf).convert(df.schema),
           sources.StringStartsWith("_1", null))
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org