You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/08 09:57:59 UTC

[GitHub] [spark] martin-g commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

martin-g commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801428949



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
-      caseSensitive: Boolean = true): MessageType = {
+      caseSensitive: Boolean,

Review comment:
       is the API break allowed here ?
   Maybe it is better to add overloaded method ?!

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)

Review comment:
       Why not just use `new HashMap<String, String>()` ? 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,42 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
     // to prevent throwing IllegalArgumentException when searching catalyst type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex =
+      catalystType.fields.zipWithIndex.map { case (f, idx) =>
+          (f.name, idx)
+        }.toMap
+
+    val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
+      nameToIndex
     } else {
-      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+      CaseInsensitiveMap(nameToIndex)
     }
+
+    // (SPARK-38094) parquet field ids, if exist, should be prioritized for matching
+    val catalystFieldIdxByFieldId =
+      if (SQLConf.get.parquetFieldIdReadEnabled && ParquetUtils.hasFieldIds(catalystType)) {
+        catalystType.fields
+          .zipWithIndex
+          .filter { case (f, _) => ParquetUtils.hasFieldId(f) }

Review comment:
       is this filter needed ? 
   `ParquetUtils.hasFieldIds(catalystType)` at line 219 checks recursively

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
-      SQLConf.ParquetOutputTimestampType.INT96) {
+      SQLConf.ParquetOutputTimestampType.INT96,
+    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
-    outputTimestampType = conf.parquetOutputTimestampType)
+    outputTimestampType = conf.parquetOutputTimestampType,
+    useFieldId = conf.parquetFieldIdWriteEnabled)
 
   def this(conf: Configuration) = this(
     writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
     outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
-      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)))
+      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)),
+    useFieldId = SQLConf.get.parquetFieldIdWriteEnabled)

Review comment:
       It is inconsistent with the other settings above.
   I am not sure why but the other use `conf.get(SQLConf.X_Y_Z.key).toAbc`

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,30 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled," +
+        " Parquet writers will populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PARQUET_FIELD_ID_READ_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.read.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names")
+      .booleanConf
+      .createWithDefault(false)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")
+      .doc("When the Parquet file doesn't have any field IDs but the" +
+        " Spark read schema is using field IDs to read, we will silently return nulls" +
+        "when this flag is enabled, or error otherwise.")

Review comment:
       missing leading space before `when`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,42 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
     // to prevent throwing IllegalArgumentException when searching catalyst type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex =
+      catalystType.fields.zipWithIndex.map { case (f, idx) =>
+          (f.name, idx)
+        }.toMap

Review comment:
       isn't `catalystType.fieldNames.zipWithIndex.toMap` the same, but less lines ?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,30 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled," +
+        " Parquet writers will populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf

Review comment:
       does it need ` .version("3.3.0")` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org