You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/04/08 13:26:25 UTC

[spark] branch master updated: [SPARK-25407][SQL] Allow nested access for non-existent field for Parquet file when nested pruning is enabled

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 215609d  [SPARK-25407][SQL] Allow nested access for non-existent field for Parquet file when nested pruning is enabled
215609d is described below

commit 215609def22da14c464b37374ceae4f53a39a145
Author: Michael Allman <ms...@allman.ms>
AuthorDate: Mon Apr 8 22:26:02 2019 +0900

    [SPARK-25407][SQL] Allow nested access for non-existent field for Parquet file when nested pruning is enabled
    
    ## What changes were proposed in this pull request?
    
    As part of schema clipping in `ParquetReadSupport.scala`, we add fields in the Catalyst requested schema which are missing from the Parquet file schema to the Parquet clipped schema. However, nested schema pruning requires we ignore unrequested field data when reading from a Parquet file. Therefore we pass two schema to `ParquetRecordMaterializer`: the schema of the file data we want to read and the schema of the rows we want to return. The reader is responsible for reconciling the di [...]
    
    Aside from checking whether schema pruning is enabled, there is an additional complication to constructing the Parquet requested schema. The manner in which Spark's two Parquet readers reconcile the differences between the Parquet requested schema and the Catalyst requested schema differ. Spark's vectorized reader does not (currently) support reading Parquet files with complex types in their schema. Further, it assumes that the Parquet requested schema includes all fields requested in [...]
    
    Spark's parquet-mr based reader supports reading Parquet files of any kind of complex schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the parquet-mr reader requires that the Parquet requested schema include only those fields present in the underlying Parquet file's schema. Therefore, in the case where we use the parquet-mr reader we intersect the Parquet clipped schema with the Parquet file's schema to construct the Parquet requested schema that's  [...]
    
    _Additional description (by HyukjinKwon):_
    
    Let's suppose that we have a Parquet schema as below:
    
    ```
    message spark_schema {
      required int32 id;
      optional group name {
        optional binary first (UTF8);
        optional binary last (UTF8);
      }
      optional binary address (UTF8);
    }
    ```
    
    Currently, the clipped schema as follows:
    
    ```
    message spark_schema {
      optional group name {
        optional binary middle (UTF8);
      }
      optional binary address (UTF8);
    }
    ```
    
    Parquet MR does not support access to the nested non-existent field (`name.middle`).
    
    To workaround this, this PR removes `name.middle` request at all to Parquet reader as below:
    
    ```
    Parquet requested schema:
    message spark_schema {
      optional binary address (UTF8);
    }
    ```
    
    and produces the record (`name.middle`) properly as the requested Catalyst schema.
    
    ```
    root
    -- name: struct (nullable = true)
        |-- middle: string (nullable = true)
    -- address: string (nullable = true)
    ```
    
    I think technically this is what Parquet library should support since Parquet library made a design decision to produce `null` for non-existent fields IIRC. This PR targets to work around it.
    
    ## How was this patch tested?
    
    A previously ignored test case which exercises the failure scenario this PR addresses has been enabled.
    
    This closes #22880
    
    Closes #24307 from dongjoon-hyun/SPARK-25407.
    
    Lead-authored-by: Michael Allman <ms...@allman.ms>
    Co-authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../datasources/parquet/ParquetFileFormat.scala    |  8 +-
 .../datasources/parquet/ParquetReadSupport.scala   | 88 +++++++++++++++++-----
 .../datasources/parquet/ParquetRowConverter.scala  | 21 ++++--
 .../execution/datasources/SchemaPruningSuite.scala |  2 +-
 4 files changed, 88 insertions(+), 31 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index efa4f3f..e37f228 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -311,6 +311,9 @@ class ParquetFileFormat
       SQLConf.SESSION_LOCAL_TIMEZONE.key,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
     hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
       SQLConf.CASE_SENSITIVE.key,
       sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
@@ -424,11 +427,12 @@ class ParquetFileFormat
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns UnsafeRow
+        val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
-          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
+          new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
         } else {
-          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+          new ParquetRecordReader[UnsafeRow](readSupport)
         }
         val iter = new RecordReaderIterator(reader)
         // SPARK-23457 Register a task completion lister before `initialization`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 3319e73..df77665 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -49,15 +49,16 @@ import org.apache.spark.sql.types._
  * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
  */
-private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
-    extends ReadSupport[UnsafeRow] with Logging {
+private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
+    enableVectorizedReader: Boolean)
+  extends ReadSupport[UnsafeRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
   def this() {
     // We need a zero-arg constructor for SpecificParquetRecordReaderBase.  But that is only
     // used in the vectorized reader, where we get the convertTz value directly, and the value here
     // is ignored.
-    this(None)
+    this(None, enableVectorizedReader = true)
   }
 
   /**
@@ -65,18 +66,48 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
    * readers.  Responsible for figuring out Parquet requested schema used for column pruning.
    */
   override def init(context: InitContext): ReadContext = {
+    val conf = context.getConfiguration
     catalystRequestedSchema = {
-      val conf = context.getConfiguration
       val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
       assert(schemaString != null, "Parquet requested schema not set.")
       StructType.fromString(schemaString)
     }
 
-    val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
+    val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
-    val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
-      context.getFileSchema, catalystRequestedSchema, caseSensitive)
-
+    val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
+    val parquetFileSchema = context.getFileSchema
+    val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema,
+      catalystRequestedSchema, caseSensitive)
+
+    // We pass two schema to ParquetRecordMaterializer:
+    // - parquetRequestedSchema: the schema of the file data we want to read
+    // - catalystRequestedSchema: the schema of the rows we want to return
+    // The reader is responsible for reconciling the differences between the two.
+    val parquetRequestedSchema = if (schemaPruningEnabled && !enableVectorizedReader) {
+      // Parquet-MR reader requires that parquetRequestedSchema include only those fields present
+      // in the underlying parquetFileSchema. Therefore, we intersect the parquetClippedSchema
+      // with the parquetFileSchema
+      ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema)
+        .map(groupType => new MessageType(groupType.getName, groupType.getFields))
+        .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+    } else {
+      // Spark's vectorized reader only support atomic types currently. It also skip fields
+      // in parquetRequestedSchema which are not present in the file.
+      parquetClippedSchema
+    }
+    logDebug(
+      s"""Going to read the following fields from the Parquet file with the following schema:
+         |Parquet file schema:
+         |$parquetFileSchema
+         |Parquet clipped schema:
+         |$parquetClippedSchema
+         |Parquet requested schema:
+         |$parquetRequestedSchema
+         |Catalyst requested schema:
+         |${catalystRequestedSchema.treeString}
+       """.stripMargin)
     new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
   }
 
@@ -90,19 +121,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
       keyValueMetaData: JMap[String, String],
       fileSchema: MessageType,
       readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
-    log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
     val parquetRequestedSchema = readContext.getRequestedSchema
-
-    logInfo {
-      s"""Going to read the following fields from the Parquet file:
-         |
-         |Parquet form:
-         |$parquetRequestedSchema
-         |Catalyst form:
-         |$catalystRequestedSchema
-       """.stripMargin
-    }
-
     new ParquetRecordMaterializer(
       parquetRequestedSchema,
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
@@ -322,6 +341,35 @@ private[parquet] object ParquetReadSupport {
     }
   }
 
+  /**
+   * Computes the structural intersection between two Parquet group types.
+   * This is used to create a requestedSchema for ReadContext of Parquet-MR reader.
+   * Parquet-MR reader does not support the nested field access to non-existent field
+   * while parquet library does support to read the non-existent field by regular field access.
+   */
+  private def intersectParquetGroups(
+      groupType1: GroupType, groupType2: GroupType): Option[GroupType] = {
+    val fields =
+      groupType1.getFields.asScala
+        .filter(field => groupType2.containsField(field.getName))
+        .flatMap {
+          case field1: GroupType =>
+            val field2 = groupType2.getType(field1.getName)
+            if (field2.isPrimitive) {
+              None
+            } else {
+              intersectParquetGroups(field1, field2.asGroupType)
+            }
+          case field1 => Some(field1)
+        }
+
+    if (fields.nonEmpty) {
+      Some(groupType1.withNewFields(fields.asJava))
+    } else {
+      None
+    }
+  }
+
   def expandUDT(schema: StructType): StructType = {
     def expand(dataType: DataType): DataType = {
       dataType match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 004a96d..b772b6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter(
   extends ParquetGroupConverter(updater) with Logging {
 
   assert(
-    parquetType.getFieldCount == catalystType.length,
-    s"""Field counts of the Parquet schema and the Catalyst schema don't match:
+    parquetType.getFieldCount <= catalystType.length,
+    s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
        |
        |Parquet schema:
        |$parquetType
@@ -182,10 +182,11 @@ private[parquet] class ParquetRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
-    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
-      case ((parquetFieldType, catalystField), ordinal) =>
-        // Converted field value should be set to the `ordinal`-th cell of `currentRow`
-        newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
+    parquetType.getFields.asScala.map { parquetField =>
+      val fieldIndex = catalystType.fieldIndex(parquetField.getName)
+      val catalystField = catalystType(fieldIndex)
+      // Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
+      newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
     }.toArray
   }
 
@@ -193,7 +194,7 @@ private[parquet] class ParquetRowConverter(
 
   override def end(): Unit = {
     var i = 0
-    while (i < currentRow.numFields) {
+    while (i < fieldConverters.length) {
       fieldConverters(i).updater.end()
       i += 1
     }
@@ -203,10 +204,14 @@ private[parquet] class ParquetRowConverter(
   override def start(): Unit = {
     var i = 0
     while (i < currentRow.numFields) {
-      fieldConverters(i).updater.start()
       currentRow.setNullAt(i)
       i += 1
     }
+    i = 0
+    while (i < fieldConverters.length) {
+      fieldConverters(i).updater.start()
+      i += 1
+    }
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index b0314e6..22317fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -135,7 +135,7 @@ abstract class SchemaPruningSuite
       Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
   }
 
-  ignore("partial schema intersection - select missing subfield") {
+  testSchemaPruning("partial schema intersection - select missing subfield") {
     val query = sql("select name.middle, address from contacts where p=2")
     checkScan(query, "struct<name:struct<middle:string>,address:string>")
     checkAnswer(query.orderBy("id"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org