You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/03 22:33:43 UTC

[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799006839



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +355,13 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+
+        if (SQLConf.get.parquetFieldIdEnabled &&
+            ParquetUtils.hasFieldIds(requiredSchema)) {
+          throw new IOException("Parquet-mr reader does not support schema with field IDs." +
+            s" Please choose a different Parquet reader. Read schema: ${requiredSchema.json}")

Review comment:
       Maybe we should recommend disabling the flag instead or in addition to?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       I am wondering if there should be two flags or one flag for reads only that is switched to false to enable easier migration.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {

Review comment:
       I believe there is already a method for it in ParquetTest.scala, otherwise it is fine to keep as is.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("general test") {

Review comment:
       Can we change it to "Parquet reads infer fields using field ids correctly" or something like that? We need to make sure it is clear that we write random field names and infer fields using ids instead. Maybe you can add a small comment to clarify.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {
+    require(hasFieldId(field),
+      "The key `parquet.field.id` doesn't exist in the metadata of " + field)

Review comment:
       Can we use FIELD_ID_METADATA_KEY here?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.

Review comment:
       We should just use vectorised reader here, there is only one.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,11 +432,50 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
+          }
+        }.getOrElse {
+        // When there is no ID match, we use a fake name to avoid a name match by accident
+        // We need this name to be unique as well, otherwise there will be type conflicts

Review comment:
       I am not quite sure I follow this statement. Can you elaborate what you mean by assigning a fake name here? What kind of fields will fall into this category? Do fake ids need to be deterministic?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
+      useFieldId: Boolean,

Review comment:
       Is it possible to not change the order of the fields and add `useFieldId` at the end of the list of arguments instead?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    val parquetRequestedSchema = readContext.getRequestedSchema
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema),
+      new ParquetToSparkSchemaConverter(conf),
+      convertTz,
+      datetimeRebaseSpec,
+      int96RebaseSpec)
+  }
+}
+
+object ParquetReadSupport extends Logging {
+  val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  def generateFakeColumnName: String = s"_fake_name_${UUID.randomUUID()}"
+
+  def getRequestedSchema(
+      parquetFileSchema: MessageType,
+      catalystRequestedSchema: StructType,
+      conf: Configuration,
+      enableVectorizedReader: Boolean): MessageType = {
     val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
     val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
-    val parquetFileSchema = context.getFileSchema
+    val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_ENABLED.key,
+      SQLConf.PARQUET_FIELD_ID_ENABLED.defaultValue.get)
+    val ignoreMissingIds = conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
+      SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
+
+    if (!ignoreMissingIds &&
+        !containsFieldIds(parquetFileSchema) &&
+        ParquetUtils.hasFieldIds(catalystRequestedSchema)) {
+      throw new RuntimeException(
+        s"""
+           |Spark read schema expects field Ids, but Parquet file schema doesn't contain field Ids.

Review comment:
       Can we extend the error message to hint on how to fix the error?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       I am inclined to keep this config as false by default

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +

Review comment:
       How does it work when there is a mixture of columns that have field id set and ones that don't?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -324,23 +401,29 @@ object ParquetReadSupport {
    * @return A list of clipped [[GroupType]] fields, which can be empty.
    */
   private def clipParquetGroupFields(
-      parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
-    val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
-    if (caseSensitive) {
-      val caseSensitiveParquetFieldMap =
+      parquetRecord: GroupType,
+      structType: StructType,
+      useFieldId: Boolean,
+      caseSensitive: Boolean): Seq[Type] = {
+    val toParquet = new SparkToParquetSchemaConverter(
+      writeLegacyParquetFormat = false, useFieldId = useFieldId)
+    lazy val caseSensitiveParquetFieldMap =
         parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
-      structType.map { f =>
-        caseSensitiveParquetFieldMap
+    lazy val caseInsensitiveParquetFieldMap =
+        parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT))
+    lazy val idToParquetFieldMap =

Review comment:
       Is it possible to fields to have the same field id?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,11 +432,50 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
+          }
+        }.getOrElse {
+        // When there is no ID match, we use a fake name to avoid a name match by accident

Review comment:
       nit: indentation needs to be fixed here.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)

Review comment:
       nit: indentation is wrong.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("general test") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType().add(
+          "a", StringType, true, withId(0))
+          .add("b", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("random", IntegerType, true, withId(1))
+          .add("name", StringType, true, withId(0))
+
+      val readData = Seq(Row("text", 100), Row("more", 200))
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)

Review comment:
       What happens if you read data without providing a schema? Will the schema be ["random", "name"] or ["name", "random"]?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org