You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/09 19:28:31 UTC

[GitHub] [spark] sunchao commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

sunchao commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802937583



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +358,7 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+

Review comment:
       nit: unrelated change 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -242,29 +297,43 @@ object ParquetReadSupport {
       // "_tuple" appended then the repeated type is the element type and elements are required.
       // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the
       // only field.
-      if (
+      val newParquetList = if (
         repeatedGroup.getFieldCount > 1 ||
         repeatedGroup.getName == "array" ||
         repeatedGroup.getName == parquetList.getName + "_tuple"
       ) {
         Types
           .buildGroup(parquetList.getRepetition)
           .as(LogicalTypeAnnotation.listType())
-          .addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
+          .addField(clipParquetType(repeatedGroup, elementType, caseSensitive, useFieldId))
           .named(parquetList.getName)
       } else {
+        val newRepeatedGroup = Types
+          .repeatedGroup()
+          .addField(
+            clipParquetType(
+              repeatedGroup.getType(0), elementType, caseSensitive, useFieldId))
+          .named(repeatedGroup.getName)
+
+        val newElementType = if (useFieldId && repeatedGroup.getId() != null) {

Review comment:
       nit: I think we can use `getId`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +446,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {

Review comment:
       I think this can be simplified:
   ```scala
       val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
       structType.map { f =>
         if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
           matchIdField(f)
         } else if (caseSensitive) {
           matchCaseSensitiveField(f)
         } else {
           matchCaseInsensitiveField(f)
         }
       }
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {

Review comment:
       maybe we can consider to combine `getFieldId` and `hasFieldId` into a single method:
   ```
   def getFieldId(field: StructField): Option[Int]
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,39 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
     // to prevent throwing IllegalArgumentException when searching catalyst type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex: Map[String, Int] = catalystType.fieldNames.zipWithIndex.toMap
+
+    val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
+      nameToIndex
     } else {
-      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+      CaseInsensitiveMap(nameToIndex)
     }
+
+    // (SPARK-38094) parquet field ids, if exist, should be prioritized for matching
+    val catalystFieldIdxByFieldId =
+      if (SQLConf.get.parquetFieldIdReadEnabled && ParquetUtils.hasFieldIds(catalystType)) {
+        catalystType.fields
+          .zipWithIndex
+          .filter { case (f, _) => ParquetUtils.hasFieldId(f) }
+          .map { case (f, idx) => (ParquetUtils.getFieldId(f), idx) }
+          .toMap
+      } else {
+        Map.empty[Int, Int]
+      }
+
     parquetType.getFields.asScala.map { parquetField =>
-      val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
-      val catalystField = catalystType(fieldIndex)
+      val catalystFieldIndex = Option(parquetField.getId).map { fieldId =>

Review comment:
       nit: I think we can use `flatMap` here:
   ```scala
   Option(parquetField.getId).flatMap { fieldId =>
           // field has id, try to match by id first before falling back to match by name
           catalystFieldIdxByFieldId.get(fieldId.intValue())
         }.getOrElse {
           // field doesn't have id, just match by name
           catalystFieldIdxByName(parquetField.getName)
         }
   ```
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
-      SQLConf.ParquetOutputTimestampType.INT96) {
+      SQLConf.ParquetOutputTimestampType.INT96,
+    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {

Review comment:
       can we add some doc above describing how this parameter is used?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,33 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, " +
+        "Parquet writers will populate the field Id " +
+        "metadata (if present) in the Spark schema to the Parquet schema.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PARQUET_FIELD_ID_READ_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.read.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " +
+        "will use field IDs (if present) in the requested Spark schema to look up Parquet " +
+        "fields instead of using column names")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")

Review comment:
       should this be `spark.sql.parquet.fieldId.read.ignoreMissing`? wonder if it will be used on the write path.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -277,27 +346,40 @@ object ParquetReadSupport {
       parquetMap: GroupType,
       keyType: DataType,
       valueType: DataType,
-      caseSensitive: Boolean): GroupType = {
+      caseSensitive: Boolean,
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or value types.
     assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
 
     val repeatedGroup = parquetMap.getType(0).asGroupType()
     val parquetKeyType = repeatedGroup.getType(0)
     val parquetValueType = repeatedGroup.getType(1)
 
-    val clippedRepeatedGroup =
-      Types
+    val clippedRepeatedGroup = {
+      val newRepeatedGroup = Types
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
-        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
-        .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
+        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive, useFieldId))
+        .addField(clipParquetType(parquetValueType, valueType, caseSensitive, useFieldId))
         .named(repeatedGroup.getName)
+      if (useFieldId && repeatedGroup.getId != null) {
+        newRepeatedGroup.withId(repeatedGroup.getId.intValue())
+      } else {
+        newRepeatedGroup
+      }
+    }
 
-    Types
+    val newMap = Types
       .buildGroup(parquetMap.getRepetition)
       .as(parquetMap.getLogicalTypeAnnotation)
       .addField(clippedRepeatedGroup)
       .named(parquetMap.getName)
+
+    if (useFieldId && parquetMap.getId() != null) {

Review comment:
       can we move this into `clipParquetType`? 
   ```scala
     private def clipParquetType(
         parquetType: Type,
         catalystType: DataType,
         caseSensitive: Boolean,
         useFieldId: Boolean): Type = {
       val newParquetType = catalystType match {
         ...
       }
       if (useFieldId && newParquetType.getId != null) {
         newParquetType.withId(parquetType.getId.intValue())
       } else {
         newParquetType
       }
     }
   ```
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -109,6 +169,7 @@ class ParquetReadSupport(
       // in parquetRequestedSchema which are not present in the file.
       parquetClippedSchema
     }
+

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org